/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

//  $Id: jlf_execplantojoblist.cpp 9702 2013-07-17 19:08:07Z xlou $

#include "jlf_execplantojoblist.h"

#include <iostream>
#include <stack>
#include <iterator>
#include <algorithm>
#include <cassert>
#include <vector>
#include <set>
#include <map>
#include <climits>
#include <cmath>
using namespace std;

#include <boost/shared_ptr.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/split.hpp>
namespace ba = boost::algorithm;

#include "calpontexecutionplan.h"
#include "calpontselectexecutionplan.h"
#include "dbrm.h"
#include "filter.h"
#include "simplefilter.h"
#include "outerjoinonfilter.h"
#include "constantfilter.h"
#include "existsfilter.h"
#include "selectfilter.h"
#include "windowfunctioncolumn.h"
#include "aggregatecolumn.h"
#include "arithmeticcolumn.h"
#include "constantcolumn.h"
#include "functioncolumn.h"
#include "pseudocolumn.h"
#include "simplecolumn.h"
#include "simplecolumn_int.h"
#include "simplecolumn_uint.h"
#include "simplecolumn_decimal.h"
#include "returnedcolumn.h"
#include "treenodeimpl.h"
#include "calpontsystemcatalog.h"
#include "logicoperator.h"
#include "predicateoperator.h"
#include "simplescalarfilter.h"
using namespace execplan;

#include "configcpp.h"
using namespace config;

#include "messagelog.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;

#include "elementtype.h"
#include "joblist.h"
#include "jobstep.h"
#include "primitivestep.h"
#include "tuplehashjoin.h"
#include "tupleunion.h"
#include "expressionstep.h"
#include "tupleconstantstep.h"

#include "jlf_common.h"
#include "jlf_subquery.h"
#include "jlf_tuplejoblist.h"
#include "mcs_decimal.h"

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wpotentially-evaluated-expression"
// for warnings on typeid :expression with side effects will be evaluated despite being used as an operand to
// 'typeid'
#endif

namespace
{
using namespace joblist;

const string boldStart = "\033[0;1m";
const string boldStop = "\033[0;39m";

typedef stack<const TreeNode*> TreeNodeStack;
//@bug 598 self-join
// typedef std::pair<execplan::CalpontSystemCatalog::OID, std::string> OIDAliasPair;  //self-join

const Operator opeq("=");
const Operator oplt("<");
const Operator ople("<=");
const Operator opgt(">");
const Operator opge(">=");
const Operator opne("<>");
const Operator opand("and");
const Operator opAND("AND");
const Operator opor("or");
const Operator opOR("OR");
const Operator opxor("xor");
const Operator opXOR("XOR");
const Operator oplike("like");
const Operator opLIKE("LIKE");
const Operator opis("is");
const Operator opIS("IS");
const Operator opisnot("is not");
const Operator opISNOT("IS NOT");
const Operator opnotlike("not like");
const Operator opNOTLIKE("NOT LIKE");
const Operator opisnotnull("isnotnull");
const Operator opisnull("isnull");

const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo);

/* This looks like an inefficient way to get NULL values. Much easier ways
   to do it. */
template <typename T>
void valueNullNum(const CalpontSystemCatalog::ColType& ct, const long timeZone, T& val)
{
  T& n = val;
  bool pushWarning = false;
  boost::any anyVal = ct.convertColumnData("", pushWarning, timeZone, true, false, false);

  switch (ct.colDataType)
  {
    case CalpontSystemCatalog::BIT:
      // n = boost::any_cast<bool>(anyVal);
      break;

    case CalpontSystemCatalog::TINYINT: n = boost::any_cast<char>(anyVal); break;

    case CalpontSystemCatalog::UTINYINT: n = boost::any_cast<uint8_t>(anyVal); break;

    case CalpontSystemCatalog::SMALLINT: n = boost::any_cast<short>(anyVal); break;

    case CalpontSystemCatalog::USMALLINT: n = boost::any_cast<uint16_t>(anyVal); break;

    case CalpontSystemCatalog::MEDINT:
    case CalpontSystemCatalog::INT: n = boost::any_cast<int>(anyVal); break;

    case CalpontSystemCatalog::UMEDINT:
    case CalpontSystemCatalog::UINT: n = boost::any_cast<uint32_t>(anyVal); break;

    case CalpontSystemCatalog::BIGINT: n = boost::any_cast<long long>(anyVal); break;

    case CalpontSystemCatalog::UBIGINT: n = boost::any_cast<uint64_t>(anyVal); break;

    case CalpontSystemCatalog::FLOAT:
    case CalpontSystemCatalog::UFLOAT:
    {
      float f = boost::any_cast<float>(anyVal);

      // N.B. There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan,
      //  but not necessarily the same bits that you put in. This only seems to be for float (double seems
      //  to work).
      if (isnan(f))
      {
        uint32_t ti = joblist::FLOATNULL;
        float* tfp = (float*)&ti;
        f = *tfp;
      }

      float* fp = &f;
      int32_t* ip = reinterpret_cast<int32_t*>(fp);
      n = *ip;
    }
    break;

    case CalpontSystemCatalog::DOUBLE:
    case CalpontSystemCatalog::UDOUBLE:
    {
      double d = boost::any_cast<double>(anyVal);
      double* dp = &d;
      int64_t* ip = reinterpret_cast<int64_t*>(dp);
      n = *ip;
    }
    break;

    case CalpontSystemCatalog::VARCHAR:
    case CalpontSystemCatalog::VARBINARY:
    case CalpontSystemCatalog::BLOB:
    case CalpontSystemCatalog::TEXT:
    case CalpontSystemCatalog::CLOB:
    case CalpontSystemCatalog::CHAR:

      // @bug 532 - where is null was not returning null rows for char(1) - char(8).
      // @Bug 972 Varchar should be treated differently from char
      if ((ct.colDataType == CalpontSystemCatalog::VARCHAR && ct.colWidth <= 7) ||
          (ct.colDataType == CalpontSystemCatalog::VARBINARY && ct.colWidth <= 7) ||
          (ct.colDataType == CalpontSystemCatalog::CHAR && ct.colWidth <= 8))
      {
        const string& i = boost::any_cast<string>(anyVal);

        // n = *((uint64_t *) i.c_str());
        /* this matches what dataconvert is returning; not valid to copy
         * 8 bytes every time. */
        if (ct.colDataType == CalpontSystemCatalog::CHAR)
        {
          switch (ct.colWidth)
          {
            case 1: n = *((uint8_t*)i.data()); break;

            case 2: n = *((uint16_t*)i.data()); break;

            case 3:
            case 4: n = *((uint32_t*)i.data()); break;

            default: n = *((uint64_t*)i.data()); break;
          }
        }
        else
        {
          switch (ct.colWidth)
          {
            case 1: n = *((uint16_t*)i.data()); break;

            case 2: n = *((uint32_t*)i.data()); break;

            default: n = *((uint64_t*)i.data()); break;
          }
        }
      }
      else
      {
        WriteEngine::Token t = boost::any_cast<WriteEngine::Token>(anyVal);
        n = *(uint64_t*)&t;
      }

      break;

    case CalpontSystemCatalog::DATE: n = boost::any_cast<uint32_t>(anyVal); break;

    case CalpontSystemCatalog::DATETIME: n = boost::any_cast<uint64_t>(anyVal); break;

    case CalpontSystemCatalog::TIMESTAMP: n = boost::any_cast<uint64_t>(anyVal); break;

    case CalpontSystemCatalog::TIME: n = boost::any_cast<int64_t>(anyVal); break;

    case CalpontSystemCatalog::DECIMAL:
    case CalpontSystemCatalog::UDECIMAL:
      if (LIKELY(ct.colWidth == datatypes::MAXDECIMALWIDTH))
        n = boost::any_cast<int128_t>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::EIGHT_BYTE)
        n = boost::any_cast<long long>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::FOUR_BYTE)
        n = boost::any_cast<int>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::TWO_BYTE)
        n = boost::any_cast<short>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
        n = boost::any_cast<char>(anyVal);

      break;

    default: break;
  }
}

template <typename T>
void convertValueNum(const string& str, const CalpontSystemCatalog::ColType& ct, bool isNull, uint8_t& rf,
                     const long timeZone, T& v)
{
  if (isNull)
  {
    valueNullNum(ct, timeZone, v);
    return;
  }

  v = 0;
  rf = 0;
  bool pushWarning = false;
  boost::any anyVal = ct.convertColumnData(str, pushWarning, timeZone, false, true, false);

  switch (ct.colDataType)
  {
    case CalpontSystemCatalog::BIT: v = boost::any_cast<bool>(anyVal); break;

    case CalpontSystemCatalog::TINYINT:
#if BOOST_VERSION >= 105200
      v = boost::any_cast<char>(anyVal);
#else
      v = boost::any_cast<int8_t>(anyVal);
#endif
      break;

    case CalpontSystemCatalog::UTINYINT: v = boost::any_cast<uint8_t>(anyVal); break;

    case CalpontSystemCatalog::SMALLINT: v = boost::any_cast<int16_t>(anyVal); break;

    case CalpontSystemCatalog::USMALLINT: v = boost::any_cast<uint16_t>(anyVal); break;

    case CalpontSystemCatalog::MEDINT:
    case CalpontSystemCatalog::INT:
      v = boost::any_cast<int32_t>(anyVal);
      break;

    case CalpontSystemCatalog::UMEDINT:
    case CalpontSystemCatalog::UINT:
      v = boost::any_cast<uint32_t>(anyVal);
      break;

    case CalpontSystemCatalog::BIGINT: v = boost::any_cast<long long>(anyVal); break;

    case CalpontSystemCatalog::UBIGINT: v = boost::any_cast<uint64_t>(anyVal); break;

    case CalpontSystemCatalog::FLOAT:
    case CalpontSystemCatalog::UFLOAT:
    {
      float f = boost::any_cast<float>(anyVal);

      // N.B. There is a bug in boost::any or in gcc where, if you store a nan,
      //     you will get back a nan, but not necessarily the same bits that you put in.
      //     This only seems to be for float (double seems to work).
      if (isnan(f))
      {
        uint32_t ti = joblist::FLOATNULL;
        float* tfp = (float*)&ti;
        f = *tfp;
      }

      float* fp = &f;
      int32_t* ip = reinterpret_cast<int32_t*>(fp);
      v = *ip;
    }
    break;

    case CalpontSystemCatalog::DOUBLE:
    case CalpontSystemCatalog::UDOUBLE:
    {
      double d = boost::any_cast<double>(anyVal);
      double* dp = &d;
      int64_t* ip = reinterpret_cast<int64_t*>(dp);
      v = *ip;
    }
    break;

    case CalpontSystemCatalog::CHAR:
    case CalpontSystemCatalog::VARCHAR:
    case CalpontSystemCatalog::VARBINARY:
    case CalpontSystemCatalog::BLOB:
    case CalpontSystemCatalog::TEXT:
    case CalpontSystemCatalog::CLOB:
    {
      string i = boost::any_cast<string>(anyVal);
      // bug 1932, pad nulls up to the size of v
      i.resize(sizeof(v), 0);
      v = *((uint64_t*)i.data());

      if (pushWarning)
        rf = ROUND_POS;
    }
    break;

    case CalpontSystemCatalog::DATE: v = boost::any_cast<uint32_t>(anyVal); break;

    case CalpontSystemCatalog::DATETIME: v = boost::any_cast<uint64_t>(anyVal); break;

    case CalpontSystemCatalog::TIMESTAMP: v = boost::any_cast<uint64_t>(anyVal); break;

    case CalpontSystemCatalog::TIME: v = boost::any_cast<int64_t>(anyVal); break;

    case CalpontSystemCatalog::DECIMAL:
    case CalpontSystemCatalog::UDECIMAL:
      if (LIKELY(ct.colWidth == datatypes::MAXDECIMALWIDTH))
        v = boost::any_cast<int128_t>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::EIGHT_BYTE)
        v = boost::any_cast<long long>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::FOUR_BYTE)
        v = boost::any_cast<int32_t>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::TWO_BYTE)
        v = boost::any_cast<int16_t>(anyVal);
      else if (ct.colWidth == execplan::CalpontSystemCatalog::ONE_BYTE)
        v = boost::any_cast<char>(anyVal);

      break;

    default: break;
  }

  if ((ct.colDataType == CalpontSystemCatalog::TINYINT || ct.colDataType == CalpontSystemCatalog::SMALLINT ||
       ct.colDataType == CalpontSystemCatalog::MEDINT || ct.colDataType == CalpontSystemCatalog::INT ||
       ct.colDataType == CalpontSystemCatalog::BIGINT || ct.colDataType == CalpontSystemCatalog::DECIMAL ||
       ct.colDataType == CalpontSystemCatalog::UTINYINT ||
       ct.colDataType == CalpontSystemCatalog::USMALLINT || ct.colDataType == CalpontSystemCatalog::UMEDINT ||
       ct.colDataType == CalpontSystemCatalog::UINT || ct.colDataType == CalpontSystemCatalog::UBIGINT ||
       ct.colDataType == CalpontSystemCatalog::UDECIMAL) &&
      pushWarning)
  {
    // get rid of leading white spaces and parentheses
    string data(str);
    size_t fpos = data.find_first_of(" \t()");

    while (string::npos != fpos)
    {
      data.erase(fpos, 1);
      fpos = data.find_first_of(" \t()");
    }

    rf = (data[0] == '-') ? ROUND_NEG : ROUND_POS;
  }
}

// TODO: make this totaly case-insensitive
int8_t op2num(const SOP& sop)
{
  if (*sop == opeq)
    return COMPARE_EQ;
  else if (*sop == oplt)
    return COMPARE_LT;
  else if (*sop == ople)
    return COMPARE_LE;
  else if (*sop == opgt)
    return COMPARE_GT;
  else if (*sop == opge)
    return COMPARE_GE;
  else if (*sop == opne)
    return COMPARE_NE;
  else if (*sop == oplike || *sop == opLIKE)
    return COMPARE_LIKE;
  else if (*sop == opis || *sop == opIS || *sop == opisnull)
    return COMPARE_EQ;
  else if (*sop == opisnot || *sop == opISNOT || *sop == opisnotnull)
    return COMPARE_NE;
  else if (*sop == opnotlike || *sop == opNOTLIKE)
    return COMPARE_NLIKE;
  else
    cerr << boldStart << "op2num: Unhandled operator >" << *sop << '<' << boldStop << endl;

  return COMPARE_NIL;
}

int8_t bop2num(const SOP& sop)
{
  if (*sop == opand || *sop == opAND)
    return BOP_AND;
  else if (*sop == opor || *sop == opOR)
    return BOP_OR;
  else if (*sop == opxor || *sop == opXOR)
    return BOP_XOR;
  else
    cerr << boldStart << "bop2num: Unhandled operator " << *sop << boldStop << endl;

  return BOP_NONE;
}

enum TreeNodeType
{
  TREENODE,
  FILTER,
  CONSTANTFILTER,
  EXISTSFILTER,
  SELECTFILTER,
  SIMPLEFILTER,
  OUTERJOINONFILTER,
  OPERATOR,
  RETURNEDCOLUMN,
  AGGREGATECOLUMN,
  WINDOWFUNCTIONCOLUMN,
  ARITHMETICCOLUMN,
  CONSTANTCOLUMN,
  FUNCTIONCOLUMN,
  SIMPLECOLUMN,
  TREENODEIMPL,
  SIMPLESCALARFILTER,
  UNKNOWN,
};

TreeNodeType TreeNode2Type(const TreeNode* tn)
{
  // The indentation here is to show inheritance only.
  if (typeid(*tn) == typeid(TreeNode))
    return TREENODE;

  if (typeid(*tn) == typeid(Filter))
    return FILTER;

  if (typeid(*tn) == typeid(ConstantFilter))
    return CONSTANTFILTER;

  if (typeid(*tn) == typeid(ExistsFilter))
    return EXISTSFILTER;

  if (typeid(*tn) == typeid(SelectFilter))
    return SELECTFILTER;

  if (typeid(*tn) == typeid(SimpleFilter))
    return SIMPLEFILTER;

  if (typeid(*tn) == typeid(OuterJoinOnFilter))
    return OUTERJOINONFILTER;

  if (typeid(*tn) == typeid(Operator) || typeid(*tn) == typeid(PredicateOperator) ||
      typeid(*tn) == typeid(LogicOperator))
    return OPERATOR;

  if (typeid(*tn) == typeid(ReturnedColumn))
    return RETURNEDCOLUMN;

  if (typeid(*tn) == typeid(AggregateColumn))
    return AGGREGATECOLUMN;

  if (typeid(*tn) == typeid(WindowFunctionColumn))
    return WINDOWFUNCTIONCOLUMN;

  if (typeid(*tn) == typeid(ArithmeticColumn))
    return ARITHMETICCOLUMN;

  if (typeid(*tn) == typeid(ConstantColumn))
    return CONSTANTCOLUMN;

  if (typeid(*tn) == typeid(FunctionColumn))
    return FUNCTIONCOLUMN;

  if (typeid(*tn) == typeid(SimpleColumn))
    return SIMPLECOLUMN;

  if (typeid(*tn) == typeid(SimpleColumn_INT<1>) || typeid(*tn) == typeid(SimpleColumn_INT<2>) ||
      typeid(*tn) == typeid(SimpleColumn_INT<4>) || typeid(*tn) == typeid(SimpleColumn_INT<8>) ||
      typeid(*tn) == typeid(SimpleColumn_UINT<1>) || typeid(*tn) == typeid(SimpleColumn_UINT<2>) ||
      typeid(*tn) == typeid(SimpleColumn_UINT<4>) || typeid(*tn) == typeid(SimpleColumn_UINT<8>))
    return SIMPLECOLUMN;

  if (typeid(*tn) == typeid(SimpleColumn_Decimal<1>) || typeid(*tn) == typeid(SimpleColumn_Decimal<2>) ||
      typeid(*tn) == typeid(SimpleColumn_Decimal<4>) || typeid(*tn) == typeid(SimpleColumn_Decimal<8>))
    return SIMPLECOLUMN;

  if (typeid(*tn) == typeid(PseudoColumn))
    return SIMPLECOLUMN;

  if (typeid(*tn) == typeid(TreeNodeImpl))
    return TREENODEIMPL;

  if (typeid(*tn) == typeid(SimpleScalarFilter))
    return SIMPLESCALARFILTER;

  return UNKNOWN;
}

const JobStepVector doColFilter(const SimpleColumn* sc1, const SimpleColumn* sc2, JobInfo& jobInfo,
                                const SOP& sop, SimpleFilter* sf)
{
  // The idea here is to take the two SC's and pipe them into a filter step.
  // The output of the filter step is one DL that is the minimum rid list met the condition.
  CalpontSystemCatalog::OID tableOid1 = tableOid(sc1, jobInfo.csc);
  CalpontSystemCatalog::OID tableOid2 = tableOid(sc2, jobInfo.csc);
  string alias1(extractTableAlias(sc1));
  string alias2(extractTableAlias(sc2));
  CalpontSystemCatalog::ColType ct1 = sc1->colType();
  CalpontSystemCatalog::ColType ct2 = sc2->colType();
  const PseudoColumn* pc1 = dynamic_cast<const PseudoColumn*>(sc1);
  const PseudoColumn* pc2 = dynamic_cast<const PseudoColumn*>(sc2);

  // XXX use this before connector sets colType in sc correctly.
  //    type of pseudo column is set by connector
  if (!sc1->schemaName().empty() && sc1->isColumnStore() && !pc1)
  {
    ct1 = jobInfo.csc->colType(sc1->oid());
    ct1.charsetNumber = sc1->colType().charsetNumber;
  }
  if (!sc2->schemaName().empty() && sc2->isColumnStore() && !pc2)
  {
    ct2 = jobInfo.csc->colType(sc2->oid());
    ct2.charsetNumber = sc2->colType().charsetNumber;
  }
  // X
  int8_t op = op2num(sop);

  pColStep* pcs1 = NULL;

  if (pc1 == NULL)
    pcs1 = new pColStep(sc1->oid(), tableOid1, ct1, jobInfo);
  else
    pcs1 = new PseudoColStep(sc1->oid(), tableOid1, pc1->pseudoType(), ct1, jobInfo);

  CalpontSystemCatalog::OID dictOid1 = isDictCol(ct1);
  pcs1->alias(alias1);
  pcs1->view(sc1->viewName());
  pcs1->name(sc1->columnName());
  pcs1->schema(sc1->schemaName());
  pcs1->cardinality(sc1->cardinality());
  pcs1->setFeederFlag(true);

  pColStep* pcs2 = NULL;

  if (pc2 == NULL)
    pcs2 = new pColStep(sc2->oid(), tableOid2, ct2, jobInfo);
  else
    pcs2 = new PseudoColStep(sc2->oid(), tableOid2, pc2->pseudoType(), ct2, jobInfo);

  CalpontSystemCatalog::OID dictOid2 = isDictCol(ct2);
  pcs2->alias(alias2);
  pcs2->view(sc2->viewName());
  pcs2->name(sc2->columnName());
  pcs2->schema(sc2->schemaName());
  pcs2->cardinality(sc2->cardinality());
  pcs2->setFeederFlag(true);

  // Associate the steps
  JobStepVector jsv;

  TupleInfo ti1(setTupleInfo(ct1, sc1->oid(), jobInfo, tableOid1, sc1, alias1));
  pcs1->tupleId(ti1.key);
  TupleInfo ti2(setTupleInfo(ct2, sc2->oid(), jobInfo, tableOid2, sc2, alias2));
  pcs2->tupleId(ti2.key);

  // check if they are string columns greater than 8 bytes.
  if ((!isDictCol(ct1)) && (!isDictCol(ct2)))
  {
    // not strings, no need for dictionary steps, output fifo datalist
    AnyDataListSPtr spdl1(new AnyDataList());

    JobStepAssociation outJs1;
    outJs1.outAdd(spdl1);
    pcs1->outputAssociation(outJs1);

    AnyDataListSPtr spdl2(new AnyDataList());
    JobStepAssociation outJs2;
    outJs2.outAdd(spdl2);

    pcs2->outputAssociation(outJs2);
    pcs2->inputAssociation(outJs1);

    FilterStep* filt = new FilterStep(ct1, jobInfo);
    filt->alias(extractTableAlias(sc1));
    filt->tableOid(tableOid1);
    filt->name(pcs1->name() + "," + pcs2->name());
    filt->view(pcs1->view());
    filt->schema(pcs1->schema());
    filt->addFilter(sf);

    if (op)
      filt->setBOP(op);

    JobStepAssociation outJs3;
    outJs3.outAdd(spdl1);
    outJs3.outAdd(spdl2);
    filt->inputAssociation(outJs3);

    SJSTEP step;
    step.reset(pcs1);
    jsv.push_back(step);
    step.reset(pcs2);
    jsv.push_back(step);
    step.reset(filt);
    jsv.push_back(step);
  }
  else if ((isCharCol(ct1)) && (isCharCol(ct2)))
  {
    // check where any column is greater than eight bytes
    if ((isDictCol(ct1) != 0) && (isDictCol(ct2) != 0))
    {
      // extra steps for string column greater than eight bytes -- from token to string
      pDictionaryStep* pdss1 = new pDictionaryStep(dictOid1, tableOid1, ct1, jobInfo);
      jobInfo.keyInfo->dictOidToColOid[dictOid1] = sc1->oid();
      pdss1->alias(extractTableAlias(sc1));
      pdss1->view(sc1->viewName());
      pdss1->name(sc1->columnName());
      pdss1->schema(sc1->schemaName());
      pdss1->cardinality(sc1->cardinality());

      // data list for column 1 step 1 (pcolstep) output
      AnyDataListSPtr spdl11(new AnyDataList());

      JobStepAssociation outJs1;
      outJs1.outAdd(spdl11);
      pcs1->outputAssociation(outJs1);

      // data list for column 1 step 2 (pdictionarystep) output
      AnyDataListSPtr spdl12(new AnyDataList());

      JobStepAssociation outJs2;
      outJs2.outAdd(spdl12);
      pdss1->outputAssociation(outJs2);

      // Associate pcs1 with pdss1
      JobStepAssociation outJs11;
      outJs11.outAdd(spdl11);
      pdss1->inputAssociation(outJs11);

      pDictionaryStep* pdss2 = new pDictionaryStep(dictOid2, tableOid2, ct2, jobInfo);
      jobInfo.keyInfo->dictOidToColOid[dictOid2] = sc2->oid();
      pdss2->alias(extractTableAlias(sc2));
      pdss2->view(sc2->viewName());
      pdss2->name(sc2->columnName());
      pdss2->schema(sc2->schemaName());
      pdss2->cardinality(sc2->cardinality());

      // data list for column 2 step 1 (pcolstep) output
      AnyDataListSPtr spdl21(new AnyDataList());

      JobStepAssociation outJs3;
      outJs3.outAdd(spdl21);
      pcs2->outputAssociation(outJs3);
      pcs2->inputAssociation(outJs2);
      // Associate pcs2 with pdss2
      JobStepAssociation outJs22;
      outJs22.outAdd(spdl21);
      pdss2->inputAssociation(outJs22);

      // data list for column 2 step 2 (pdictionarystep) output
      AnyDataListSPtr spdl22(new AnyDataList());

      JobStepAssociation outJs4;
      outJs4.outAdd(spdl22);
      pdss2->outputAssociation(outJs4);

      FilterStep* filt = new FilterStep(ct1, jobInfo);
      filt->alias(extractTableAlias(sc1));
      filt->tableOid(tableOid1);
      filt->name(pcs1->name() + "," + pcs2->name());
      filt->view(pcs1->view());
      filt->schema(pcs1->schema());
      filt->addFilter(sf);

      if (op)
        filt->setBOP((op));

      JobStepAssociation outJs5;
      outJs5.outAdd(spdl12);
      outJs5.outAdd(spdl22);
      filt->inputAssociation(outJs5);

      SJSTEP step;
      step.reset(pcs1);
      jsv.push_back(step);
      step.reset(pdss1);
      jsv.push_back(step);
      step.reset(pcs2);
      jsv.push_back(step);
      step.reset(pdss2);
      jsv.push_back(step);
      step.reset(filt);
      jsv.push_back(step);

      TupleInfo ti1(setTupleInfo(ct1, dictOid1, jobInfo, tableOid1, sc1, alias1));
      pdss1->tupleId(ti1.key);
      jobInfo.keyInfo->dictKeyMap[pcs1->tupleId()] = ti1.key;
      jobInfo.tokenOnly[pcs1->tupleId()] = false;

      TupleInfo ti2(setTupleInfo(ct2, dictOid2, jobInfo, tableOid2, sc2, alias2));
      pdss2->tupleId(ti2.key);
      jobInfo.keyInfo->dictKeyMap[pcs2->tupleId()] = ti2.key;
      jobInfo.tokenOnly[pcs2->tupleId()] = false;
    }
    else if ((isDictCol(ct1) != 0) && (isDictCol(ct2) == 0))  // col1 is dictionary column
    {
      // extra steps for string column greater than eight bytes -- from token to string
      pDictionaryStep* pdss1 = new pDictionaryStep(dictOid1, tableOid1, ct1, jobInfo);
      jobInfo.keyInfo->dictOidToColOid[dictOid1] = sc1->oid();
      pdss1->alias(extractTableAlias(sc1));
      pdss1->view(sc1->viewName());
      pdss1->name(sc1->columnName());
      pdss1->schema(sc1->schemaName());
      pdss1->cardinality(sc1->cardinality());

      // data list for column 1 step 1 (pcolstep) output
      AnyDataListSPtr spdl11(new AnyDataList());


      JobStepAssociation outJs1;
      outJs1.outAdd(spdl11);
      pcs1->outputAssociation(outJs1);

      // data list for column 1 step 2 (pdictionarystep) output
      AnyDataListSPtr spdl12(new AnyDataList());

      JobStepAssociation outJs2;
      outJs2.outAdd(spdl12);
      pdss1->outputAssociation(outJs2);

      // Associate pcs1 with pdss1
      JobStepAssociation outJs11;
      outJs11.outAdd(spdl11);
      pdss1->inputAssociation(outJs11);

      // data list for column 2 step 1 (pcolstep) output
      AnyDataListSPtr spdl21(new AnyDataList());

      JobStepAssociation outJs3;
      outJs3.outAdd(spdl21);
      pcs2->outputAssociation(outJs3);
      pcs2->inputAssociation(outJs2);

      FilterStep* filt = new FilterStep(ct1, jobInfo);
      filt->alias(extractTableAlias(sc1));
      filt->tableOid(tableOid1);
      filt->view(pcs1->view());
      filt->schema(pcs1->schema());
      filt->addFilter(sf);

      if (op)
        filt->setBOP((op));

      JobStepAssociation outJs5;
      outJs5.outAdd(spdl12);
      outJs5.outAdd(spdl21);
      filt->inputAssociation(outJs5);

      SJSTEP step;
      step.reset(pcs1);
      jsv.push_back(step);
      step.reset(pdss1);
      jsv.push_back(step);
      step.reset(pcs2);
      jsv.push_back(step);
      step.reset(filt);
      jsv.push_back(step);

      TupleInfo ti1(setTupleInfo(ct1, dictOid1, jobInfo, tableOid1, sc1, alias1));
      pdss1->tupleId(ti1.key);
      jobInfo.keyInfo->dictKeyMap[pcs1->tupleId()] = ti1.key;
      jobInfo.tokenOnly[pcs1->tupleId()] = false;
    }
    else  // if ((isDictCol(ct1) == 0 ) && (isDictCol(ct2) !=0 )) //col2 is dictionary column
    {
      // extra steps for string column greater than eight bytes -- from token to string
      // data list for column 1 step 1 (pcolstep) output
      AnyDataListSPtr spdl11(new AnyDataList());


      JobStepAssociation outJs1;
      outJs1.outAdd(spdl11);
      pcs1->outputAssociation(outJs1);

      // data list for column 1 step 2 (pdictionarystep) output
      AnyDataListSPtr spdl12(new AnyDataList());

      pDictionaryStep* pdss2 = new pDictionaryStep(dictOid2, tableOid2, ct2, jobInfo);
      jobInfo.keyInfo->dictOidToColOid[dictOid2] = sc2->oid();
      pdss2->alias(extractTableAlias(sc2));
      pdss2->view(sc2->viewName());
      pdss2->name(sc2->columnName());
      pdss2->schema(sc2->schemaName());
      pdss2->cardinality(sc2->cardinality());

      // data list for column 2 step 1 (pcolstep) output
      AnyDataListSPtr spdl21(new AnyDataList());

      JobStepAssociation outJs3;
      outJs3.outAdd(spdl21);
      pcs2->outputAssociation(outJs3);
      pcs2->inputAssociation(outJs1);
      // Associate pcs2 with pdss2
      JobStepAssociation outJs22;
      outJs22.outAdd(spdl21);
      pdss2->inputAssociation(outJs22);

      // data list for column 2 step 2 (pdictionarystep) output
      AnyDataListSPtr spdl22(new AnyDataList());

      JobStepAssociation outJs4;
      outJs4.outAdd(spdl22);
      pdss2->outputAssociation(outJs4);

      FilterStep* filt = new FilterStep(ct1, jobInfo);
      filt->alias(extractTableAlias(sc1));
      filt->tableOid(tableOid1);
      filt->view(pcs1->view());
      filt->schema(pcs1->schema());
      filt->addFilter(sf);

      if (op)
        filt->setBOP((op));

      JobStepAssociation outJs5;
      outJs5.outAdd(spdl11);
      outJs5.outAdd(spdl22);
      filt->inputAssociation(outJs5);

      SJSTEP step;
      step.reset(pcs1);
      jsv.push_back(step);
      step.reset(pcs2);
      jsv.push_back(step);
      step.reset(pdss2);
      jsv.push_back(step);
      step.reset(filt);
      jsv.push_back(step);

      TupleInfo ti2(setTupleInfo(ct2, dictOid2, jobInfo, tableOid2, sc2, alias2));
      pdss2->tupleId(ti2.key);
      jobInfo.keyInfo->dictKeyMap[pcs2->tupleId()] = ti2.key;
      jobInfo.tokenOnly[pcs2->tupleId()] = false;
    }
  }
  else
  {
    cerr << boldStart << "Filterstep: Filter with different type is not supported " << boldStop << endl;
    throw QueryDataExcept("Filter with different column types is not supported.", incompatFilterCols);
  }

  return jsv;
}

const JobStepVector doFilterExpression(const SimpleColumn* sc1, const SimpleColumn* sc2, JobInfo& jobInfo,
                                       const SOP& sop)
{
  JobStepVector jsv;
  SJSTEP sjstep;
  ExpressionStep* es = new ExpressionStep(jobInfo);

  if (es == NULL)
    throw runtime_error("Failed to create ExpressionStep 2");

  SimpleFilter sf;
  sf.op(sop);
  sf.lhs(sc1->clone());
  sf.rhs(sc2->clone());
  es->expressionFilter(&sf, jobInfo);

  sjstep.reset(es);
  jsv.push_back(sjstep);

  return jsv;
}

const JobStepVector doJoin(SimpleColumn* sc1, SimpleColumn* sc2, JobInfo& jobInfo, const SOP& sop,
                           SimpleFilter* sf)
{
  if ((sc1->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
       sc1->resultType().colDataType == CalpontSystemCatalog::BLOB))
    throw runtime_error("VARBINARY/BLOB in join is not supported.");

  // The idea here is to take the two SC's and pipe them into a HJ step. The output of the HJ step
  // is 2 DL's (one for each table) that are the minimum rid list for each side of the join.
  CalpontSystemCatalog::OID tableOid1 = tableOid(sc1, jobInfo.csc);
  CalpontSystemCatalog::OID tableOid2 = tableOid(sc2, jobInfo.csc);
  string alias1(extractTableAlias(sc1));
  string alias2(extractTableAlias(sc2));
  string view1(sc1->viewName());
  string view2(sc2->viewName());
  string schema1(sc1->schemaName());
  string schema2(sc2->schemaName());

  CalpontSystemCatalog::ColType ct1 = sc1->colType();
  CalpontSystemCatalog::ColType ct2 = sc2->colType();
  PseudoColumn* pc1 = dynamic_cast<PseudoColumn*>(sc1);
  PseudoColumn* pc2 = dynamic_cast<PseudoColumn*>(sc2);

  // XXX use this before connector sets colType in sc correctly.
  //    type of pseudo column is set by connector
  if (!sc1->schemaName().empty() && sc1->isColumnStore() && !pc1)
  {
    ct1 = jobInfo.csc->colType(sc1->oid());
    ct1.charsetNumber = sc1->colType().charsetNumber;
  }
  if (!sc2->schemaName().empty() && sc2->isColumnStore() && !pc2)
  {
    ct2 = jobInfo.csc->colType(sc2->oid());
    ct2.charsetNumber = sc2->colType().charsetNumber;
  }
  // X
  uint64_t joinInfo = sc1->joinInfo() | sc2->joinInfo();

  if (sc1->schemaName().empty())
  {
    SimpleColumn* tmp = (sc1);
    updateDerivedColumn(jobInfo, tmp, ct1);
  }

  if (sc2->schemaName().empty())
  {
    SimpleColumn* tmp = (sc2);
    updateDerivedColumn(jobInfo, tmp, ct2);
  }

  //@bug 566 Dont do a hash join if the table and the alias are the same
  // Bug 590
  if (tableOid1 == tableOid2 && alias1 == alias2 && view1 == view2 && joinInfo == 0)
  {
    if (sc1->schemaName().empty() || !compatibleColumnTypes(ct1, ct2, false))
    {
      return doFilterExpression(sc1, sc2, jobInfo, sop);
    }

    JobStepVector colFilter = doColFilter(sc1, sc2, jobInfo, sop, sf);
    // jsv.insert(jsv.end(), colFilter.begin(), colFilter.end());
    return colFilter;
  }

  // different tables
  if (!compatibleColumnTypes(ct1, ct2, true))
  {
    JobStepVector jsv;
    jsv = doFilterExpression(sc1, sc2, jobInfo, sop);
    uint32_t t1 = makeTableKey(jobInfo, sc1);
    uint32_t t2 = makeTableKey(jobInfo, sc2);
    jobInfo.incompatibleJoinMap[t1] = t2;
    jobInfo.incompatibleJoinMap[t2] = t1;

    return jsv;
  }

  pColStep* pcs1 = NULL;
  CalpontSystemCatalog::OID oid1 = sc1->oid();
  CalpontSystemCatalog::OID dictOid1 = isDictCol(ct1);

  if (sc1->schemaName().empty() == false)
  {
    if (pc1 == NULL)
      pcs1 = new pColStep(oid1, tableOid1, ct1, jobInfo);
    else
      pcs1 = new PseudoColStep(oid1, tableOid1, pc1->pseudoType(), ct1, jobInfo);

    pcs1->alias(alias1);
    pcs1->view(view1);
    pcs1->name(sc1->columnName());
    pcs1->schema(sc1->schemaName());
    pcs1->cardinality(sc1->cardinality());
  }

  pColStep* pcs2 = NULL;
  CalpontSystemCatalog::OID oid2 = sc2->oid();
  CalpontSystemCatalog::OID dictOid2 = isDictCol(ct2);

  if (sc2->schemaName().empty() == false)
  {
    if (pc2 == NULL)
      pcs2 = new pColStep(oid2, tableOid2, ct2, jobInfo);
    else
      pcs2 = new PseudoColStep(oid2, tableOid2, pc2->pseudoType(), ct2, jobInfo);

    pcs2->alias(alias2);
    pcs2->view(view2);
    pcs2->name(sc2->columnName());
    pcs2->schema(sc2->schemaName());
    pcs2->cardinality(sc2->cardinality());
  }

  JoinType jt = INNER;

  if (sc1->returnAll() && sc2->returnAll())
  {
    // Due to current connector limitation, INNER may have both returnAll set.
    // @bug3037, compound outer join may have both flag set if not the 1st pair.
    jt = INNER;
  }
  else if (sc1->returnAll())
  {
    jt = LEFTOUTER;
  }
  else if (sc2->returnAll())
  {
    jt = RIGHTOUTER;
  }

  // Associate the steps
  JobStepVector jsv;
  SJSTEP step;

  // bug 1495 compound join, v-table handles string join and compound join the same way
  AnyDataListSPtr spdl1(new AnyDataList());
  RowGroupDL* dl1 = new RowGroupDL(1, jobInfo.fifoSize);
  spdl1->rowGroupDL(dl1);
  dl1->OID(oid1);

  if (pcs1)
  {
    JobStepAssociation outJs1;
    outJs1.outAdd(spdl1);
    pcs1->outputAssociation(outJs1);

    step.reset(pcs1);
    jsv.push_back(step);
  }

  AnyDataListSPtr spdl2(new AnyDataList());
  RowGroupDL* dl2 = new RowGroupDL(1, jobInfo.fifoSize);
  spdl2->rowGroupDL(dl2);
  dl2->OID(oid2);

  if (pcs2)
  {
    JobStepAssociation outJs2;
    outJs2.outAdd(spdl2);
    pcs2->outputAssociation(outJs2);

    step.reset(pcs2);
    jsv.push_back(step);
  }

  TupleHashJoinStep* thj = new TupleHashJoinStep(jobInfo);
  thj->tableOid1(tableOid1);
  thj->tableOid2(tableOid2);
  thj->alias1(alias1);
  thj->alias2(alias2);
  thj->view1(view1);
  thj->view2(view2);
  thj->schema1(schema1);
  thj->schema2(schema2);
  thj->oid1(oid1);
  thj->oid2(oid2);
  thj->dictOid1(dictOid1);
  thj->dictOid2(dictOid2);
  thj->sequence1(sc1->sequence());
  thj->sequence2(sc2->sequence());
  thj->column1(sc1);
  thj->column2(sc2);

  // MCOL-334 joins in views need to have higher priority than SEMI/ANTI
  if (!view1.empty() && view1 == view2)
  {
    // MCOL-5061. We could have a filters to be associated with a specific join id, therefore we cannot have
    // the same join id for different `TupleHashJoin` steps.
    thj->joinId(std::numeric_limits<int64_t>::min() + (++jobInfo.joinNumInView));
  }
  else
  {
    thj->joinId((joinInfo == 0) ? (++jobInfo.joinNum) : 0);
  }

  // Check if SEMI/ANTI join.
  // INNER/OUTER join and SEMI/ANTI are mutually exclusive,
  if (joinInfo != 0)
  {
    // @bug3998, keep the OUTER join type
    // jt = INIT;

    if (joinInfo & JOIN_SEMI)
      jt |= SEMI;

    if (joinInfo & JOIN_ANTI)
      jt |= ANTI;

    if (joinInfo & JOIN_SCALAR)
      jt |= SCALAR;

    if (joinInfo & JOIN_NULL_MATCH)
      jt |= MATCHNULLS;

    if (joinInfo & JOIN_CORRELATED)
      jt |= CORRELATED;

    if (joinInfo & JOIN_OUTER_SELECT)
      jt |= LARGEOUTER;

    if (sc1->joinInfo() & JOIN_CORRELATED)
      thj->correlatedSide(1);
    else if (sc2->joinInfo() & JOIN_CORRELATED)
      thj->correlatedSide(2);
  }

  thj->setJoinType(jt);

  JobStepAssociation outJs3;
  outJs3.outAdd(spdl1);
  outJs3.outAdd(spdl2);
  thj->inputAssociation(outJs3);
  step.reset(thj);

  TupleInfo ti1(setTupleInfo(ct1, oid1, jobInfo, tableOid1, sc1, alias1));

  if (pcs1)
  {
    pcs1->tupleId(ti1.key);
    thj->tupleId1(ti1.key);

    if (dictOid1 > 0)
    {
      ti1 = setTupleInfo(ct1, dictOid1, jobInfo, tableOid1, sc1, alias1);
      jobInfo.keyInfo->dictOidToColOid[dictOid1] = oid1;
      jobInfo.keyInfo->dictKeyMap[pcs1->tupleId()] = ti1.key;
      jobInfo.tokenOnly[pcs1->tupleId()] = false;
      //			thj->tupleId1(ti1.key);
    }
  }
  else
  {
    thj->tupleId1(getTupleKey(jobInfo, sc1));
  }

  TupleInfo ti2(setTupleInfo(ct2, oid2, jobInfo, tableOid2, sc2, alias2));

  if (pcs2)
  {
    pcs2->tupleId(ti2.key);
    thj->tupleId2(pcs2->tupleId());

    if (dictOid2 > 0)
    {
      TupleInfo ti2(setTupleInfo(ct2, dictOid2, jobInfo, tableOid2, sc2, alias2));
      jobInfo.keyInfo->dictOidToColOid[dictOid2] = oid2;
      jobInfo.keyInfo->dictKeyMap[pcs2->tupleId()] = ti2.key;
      jobInfo.tokenOnly[pcs2->tupleId()] = false;
      //			thj->tupleId2(ti2.key);
    }
  }
  else
  {
    thj->tupleId2(getTupleKey(jobInfo, sc2));
  }

  jsv.push_back(step);

  return jsv;
}

const JobStepVector doSemiJoin(const SimpleColumn* sc, const ReturnedColumn* rc, JobInfo& jobInfo)
{
  if ((sc->resultType().colDataType == CalpontSystemCatalog::VARBINARY ||
       sc->resultType().colDataType == CalpontSystemCatalog::BLOB))
    throw runtime_error("VARBINARY/BLOB in join is not supported.");

  CalpontSystemCatalog::OID tableOid1 = tableOid(sc, jobInfo.csc);
  CalpontSystemCatalog::OID tableOid2 = execplan::CNX_VTABLE_ID;
  string alias1(extractTableAlias(sc));
  string alias2(jobInfo.subAlias);
  CalpontSystemCatalog::ColType ct1 = sc->colType();
  CalpontSystemCatalog::ColType ct2 = rc->resultType();
  const PseudoColumn* pc1 = dynamic_cast<const PseudoColumn*>(sc);

  // XXX use this before connector sets colType in sc correctly.
  //    type of pseudo column is set by connector
  if (!sc->schemaName().empty() && sc->isColumnStore() && !pc1)
  {
    ct1 = jobInfo.csc->colType(sc->oid());
    ct1.charsetNumber = sc->colType().charsetNumber;
  }
  // X
  JobStepVector jsv;
  SJSTEP step;

  CalpontSystemCatalog::OID dictOid1 = 0;
  uint64_t tupleId1 = -1;
  uint64_t tupleId2 = -1;

  if (sc->schemaName().empty() == false)
  {
    pColStep* pcs1 = NULL;

    if (pc1 == NULL)
      pcs1 = new pColStep(sc->oid(), tableOid1, ct1, jobInfo);
    else
      pcs1 = new PseudoColStep(sc->oid(), tableOid1, pc1->pseudoType(), ct1, jobInfo);

    dictOid1 = isDictCol(ct1);
    pcs1->alias(alias1);
    pcs1->view(sc->viewName());
    pcs1->name(sc->columnName());
    pcs1->schema(sc->schemaName());
    pcs1->cardinality(sc->cardinality());

    step.reset(pcs1);
    jsv.push_back(step);

    TupleInfo ti1(setTupleInfo(ct1, sc->oid(), jobInfo, tableOid1, sc, alias1));
    pcs1->tupleId(ti1.key);
    tupleId1 = ti1.key;

    if (dictOid1 > 0)
    {
      ti1 = setTupleInfo(ct1, dictOid1, jobInfo, tableOid1, sc, alias1);
      jobInfo.keyInfo->dictOidToColOid[dictOid1] = sc->oid();
      jobInfo.keyInfo->dictKeyMap[tupleId1] = ti1.key;
      jobInfo.tokenOnly[pcs1->tupleId()] = false;
    }
  }

  TupleHashJoinStep* thj = new TupleHashJoinStep(jobInfo);
  thj->tableOid1(tableOid1);
  thj->tableOid2(tableOid2);
  thj->alias1(alias1);
  thj->view1(sc->viewName());
  thj->schema1(sc->schemaName());
  thj->oid1(sc->oid());
  thj->oid2(tableOid2 + 1 + rc->sequence());
  thj->alias2(alias2);
  thj->dictOid1(dictOid1);
  thj->dictOid2(0);
  thj->sequence1(sc->sequence());
  thj->sequence2(rc->sequence());
  thj->column1(sc);
  thj->column2(rc);
  thj->joinId(0);
  thj->tupleId1(tupleId1);
  thj->tupleId2(tupleId2);

  // set join type
  JoinType jt = INIT;
  uint64_t joinInfo = sc->joinInfo();

  if (joinInfo & JOIN_SEMI)
    jt |= SEMI;

  if (joinInfo & JOIN_ANTI)
    jt |= ANTI;

  if (joinInfo & JOIN_SCALAR)
    jt |= SCALAR;

  if (joinInfo & JOIN_NULL_MATCH)
    jt |= MATCHNULLS;

  if (joinInfo & JOIN_CORRELATED)
    jt |= CORRELATED;

  if (joinInfo & JOIN_OUTER_SELECT)
    jt |= LARGEOUTER;

  thj->setJoinType(jt);

  // set correlated side
  thj->correlatedSide(1);

  step.reset(thj);
  jsv.push_back(step);
  return jsv;
}

SJSTEP expressionToFuncJoin(ExpressionStep* es, JobInfo& jobInfo)
{
  idbassert(es);
  boost::shared_ptr<FunctionJoinInfo> fji = es->functionJoinInfo();
  es->functionJoin(true);
  TupleHashJoinStep* thjs = new TupleHashJoinStep(jobInfo);
  thjs->tableOid1(fji->fTableOid[0]);
  thjs->tableOid2(fji->fTableOid[1]);
  thjs->oid1(fji->fOid[0]);
  thjs->oid2(fji->fOid[1]);
  thjs->alias1(fji->fAlias[0]);
  thjs->alias2(fji->fAlias[1]);
  thjs->view1(fji->fView[0]);
  thjs->view2(fji->fView[1]);
  thjs->schema1(fji->fSchema[0]);
  thjs->schema2(fji->fSchema[1]);
  thjs->column1(fji->fExpression[0]);
  thjs->column2(fji->fExpression[1]);
  thjs->sequence1(fji->fSequence[0]);
  thjs->sequence2(fji->fSequence[1]);
  thjs->joinId(fji->fJoinId);
  thjs->setJoinType(fji->fJoinType);
  thjs->correlatedSide(fji->fCorrelatedSide);
  thjs->funcJoinInfo(fji);
  thjs->tupleId1(fji->fJoinKey[0]);
  thjs->tupleId2(fji->fJoinKey[1]);

  updateTableKey(fji->fJoinKey[0], fji->fTableKey[0], jobInfo);
  updateTableKey(fji->fJoinKey[1], fji->fTableKey[1], jobInfo);

  return SJSTEP(thjs);
}

const JobStepVector doExpressionFilter(const ParseTree* n, JobInfo& jobInfo)
{
  JobStepVector jsv;
  ExpressionStep* es = new ExpressionStep(jobInfo);

  if (es == NULL)
    throw runtime_error("Failed to create ExpressionStep 1");

  es->expressionFilter(n, jobInfo);
  SJSTEP sjstep(es);
  jsv.push_back(sjstep);

  return jsv;
}

const JobStepVector doExpressionFilter(const Filter* f, JobInfo& jobInfo)
{
  JobStepVector jsv;
  ExpressionStep* es = new ExpressionStep(jobInfo);

  if (es == NULL)
    throw runtime_error("Failed to create ExpressionStep 2");

  es->expressionFilter(f, jobInfo);
  SJSTEP sjstep(es);
  jsv.push_back(sjstep);

  // @bug3683, support in/exists suquery function join
  const SimpleFilter* sf = dynamic_cast<const SimpleFilter*>(f);

  if (sf != NULL)
  {
    const ReturnedColumn* lrc = static_cast<const ReturnedColumn*>(sf->lhs());
    const ReturnedColumn* rrc = static_cast<const ReturnedColumn*>(sf->rhs());

    if (lrc->joinInfo())
    {
      const ReturnedColumn* lac = dynamic_cast<const ArithmeticColumn*>(lrc);
      const ReturnedColumn* lfc = dynamic_cast<const FunctionColumn*>(lrc);
      const ReturnedColumn* lsc = dynamic_cast<const SimpleColumn*>(lrc);

      if ((lac || lfc || lsc) && es->functionJoinInfo())
        jsv.push_back(expressionToFuncJoin(es, jobInfo));
    }
    else if (rrc->joinInfo())
    {
      const ReturnedColumn* rac = dynamic_cast<const ArithmeticColumn*>(lrc);
      const ReturnedColumn* rfc = dynamic_cast<const FunctionColumn*>(lrc);
      const ReturnedColumn* rsc = dynamic_cast<const SimpleColumn*>(lrc);

      if ((rac || rfc || rsc) && es->functionJoinInfo())
        jsv.push_back(expressionToFuncJoin(es, jobInfo));
    }
  }

  return jsv;
}

const JobStepVector doConstantBooleanFilter(const ParseTree* n, JobInfo& jobInfo)
{
  JobStepVector jsv;
  TupleConstantBooleanStep* tcbs = new TupleConstantBooleanStep(jobInfo, n->data()->getBoolVal());

  if (tcbs == NULL)
    throw runtime_error("Failed to create Constant Boolean Step");

  SJSTEP sjstep(tcbs);
  jsv.push_back(sjstep);

  return jsv;
}

bool optimizeIdbPatitionSimpleFilter(SimpleFilter* sf, JobStepVector& jsv, JobInfo& jobInfo)
{
  //@bug5848, not equal filter is not optimized.
  if (sf->op()->op() != opeq.op())
    return false;

  const FunctionColumn* fc = dynamic_cast<const FunctionColumn*>(sf->lhs());
  const ConstantColumn* cc = dynamic_cast<const ConstantColumn*>(sf->rhs());

  if (fc == nullptr)
  {
    cc = dynamic_cast<const ConstantColumn*>(sf->lhs());
    fc = dynamic_cast<const FunctionColumn*>(sf->rhs());
  }

  // not a function or not idbparttition
  if (fc == nullptr || cc == nullptr || fc->functionName().compare("idbpartition") != 0)
    return false;

  // make sure the cc has 3 tokens
  vector<string> cv;
  auto str = cc->constval().safeString("");
  boost::split(cv, str, boost::is_any_of("."));

  if (cv.size() != 3)
    return false;

  // construct separate filters, then make job steps
  JobStepVector v;
  SOP sop = sf->op();
  const funcexp::FunctionParm& parms = fc->functionParms();

  for (uint64_t i = 0; i < 3; i++)
  {
    // very weird parms order is: db root, physical partition, segment
    // logical partition string : physical partition, segment, db root
    ReturnedColumn* lhs = dynamic_cast<ReturnedColumn*>(parms[(i + 1) % 3]->data());
    ConstantColumn* rhs = new ConstantColumn(cv[i]);
    SimpleFilter* f = new SimpleFilter(sop, lhs->clone(), rhs);
    v = doSimpleFilter(f, jobInfo);
    jsv.insert(jsv.end(), v.begin(), v.end());
  }

  return true;
}

const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
{
  JobStepVector jsv;

  if (sf == nullptr)
    return jsv;

  // cout << "doSimpleFilter " << endl;
  // cout << *sf << endl;
  ReturnedColumn* lhs;
  ReturnedColumn* rhs;
  SOP sop;

  lhs = sf->lhs();
  rhs = sf->rhs();
  sop = sf->op();

  TreeNodeType lhsType;
  TreeNodeType rhsType;

  lhsType = TreeNode2Type(lhs);
  rhsType = TreeNode2Type(rhs);

  CalpontSystemCatalog::OID tbl_oid = 0;

  SJSTEP sjstep;

  if (lhsType == SIMPLECOLUMN && rhsType == CONSTANTCOLUMN)
  {
    int8_t cop = op2num(sop);
    const SimpleColumn* sc = static_cast<const SimpleColumn*>(lhs);
    const ConstantColumn* cc = static_cast<const ConstantColumn*>(rhs);
    string alias(extractTableAlias(sc));
    string view(sc->viewName());
    string schema(sc->schemaName());
    tbl_oid = tableOid(sc, jobInfo.csc);

    if (sc->joinInfo() != 0 && cc->sequence() != -1)
    {
      // correlated, like in 'c1 in select 1' type sub queries.
      return doSemiJoin(sc, cc, jobInfo);
    }
    else if (sc->schemaName().empty())
    {
      // bug 3749, mark outer join table with isNull filter
      if (cc->isNull() && (opis == *sop || opisnull == *sop))
        jobInfo.tableHasIsNull.insert(getTableKey(jobInfo, tbl_oid, alias, "", view));

      return doExpressionFilter(sf, jobInfo);
    }

    utils::NullString constval(cc->constval());

    CalpontSystemCatalog::OID dictOid = 0;
    CalpontSystemCatalog::ColType ct = sc->colType();
    const PseudoColumn* pc = dynamic_cast<const PseudoColumn*>(sc);

    // XXX use this before connector sets colType in sc correctly.
    //    type of pseudo column is set by connector
    if (!sc->schemaName().empty() && sc->isColumnStore() && !pc)
    {
      ct = jobInfo.csc->colType(sc->oid());
      ct.charsetNumber = sc->colType().charsetNumber;
    }
    // X

    //@bug 339 nulls are not stored in dictionary
    if ((dictOid = isDictCol(ct)) > 0 && !cc->isNull())
    {
      if (jobInfo.trace)
        cout << "Emit pTokenByScan/pCol for SimpleColumn op ConstantColumn" << endl;

      // dictionary, cannot be pseudo column
      pColStep* pcs = new pColStep(sc->oid(), tbl_oid, ct, jobInfo);
      pcs->alias(alias);
      pcs->view(view);
      pcs->name(sc->columnName());
      pcs->schema(sc->schemaName());
      pcs->cardinality(sc->cardinality());

      if (ct.colDataType == execplan::CalpontSystemCatalog::TEXT ||
          ct.colDataType == execplan::CalpontSystemCatalog::BLOB ||
          filterWithDictionary(dictOid, jobInfo.stringScanThreshold))
      {
        pDictionaryStep* pds = new pDictionaryStep(dictOid, tbl_oid, ct, jobInfo);
        jobInfo.keyInfo->dictOidToColOid[dictOid] = sc->oid();
        pds->alias(alias);
        pds->view(view);
        pds->name(sc->columnName());
        pds->schema(sc->schemaName());
        pds->cardinality(sc->cardinality());

        // Add the filter
        pds->addFilter(cop, constval.safeString(""));

        // data list for pcolstep output
        AnyDataListSPtr spdl1(new AnyDataList());

        JobStepAssociation outJs1;
        outJs1.outAdd(spdl1);
        pcs->outputAssociation(outJs1);

        // data list for pdictionarystep output
        AnyDataListSPtr spdl2(new AnyDataList());

        JobStepAssociation outJs2;
        outJs2.outAdd(spdl2);
        pds->outputAssociation(outJs2);

        // Associate pcs with pds
        JobStepAssociation outJs;
        outJs.outAdd(spdl1);
        pds->inputAssociation(outJs);

        sjstep.reset(pcs);
        jsv.push_back(sjstep);
        sjstep.reset(pds);
        jsv.push_back(sjstep);

        // save for expression transformation
        pds->addFilter(sf);

        // token column
        CalpontSystemCatalog::ColType tct;
        tct.colDataType = CalpontSystemCatalog::BIGINT;
        tct.colWidth = 8;
        tct.scale = 0;
        tct.precision = 0;
        tct.compressionType = ct.compressionType;
        TupleInfo ti(setTupleInfo(tct, sc->oid(), jobInfo, tbl_oid, sc, alias));
        jobInfo.keyInfo->token2DictTypeMap[ti.key] = ct;
        pcs->tupleId(ti.key);

        // string column
        ti = setTupleInfo(ct, dictOid, jobInfo, tbl_oid, sc, alias);
        pds->tupleId(ti.key);
        jobInfo.keyInfo->dictKeyMap[pcs->tupleId()] = ti.key;
        jobInfo.tokenOnly[ti.key] = false;
      }
      else
      {
        pDictionaryScan* pds = new pDictionaryScan(dictOid, tbl_oid, ct, jobInfo);
        jobInfo.keyInfo->dictOidToColOid[dictOid] = sc->oid();
        pds->alias(alias);
        pds->view(view);
        pds->name(sc->columnName());
        pds->schema(sc->schemaName());
        pds->cardinality(sc->cardinality());

        // Add the filter
        pds->addFilter(cop, constval.safeString(""));

        // save for expression transformation
        pds->addFilter(sf);

        TupleHashJoinStep* thj = new TupleHashJoinStep(jobInfo);
        thj->tableOid1(0);
        thj->tableOid2(tbl_oid);
        thj->alias1(alias);
        thj->alias2(alias);
        thj->view1(view);
        thj->view2(view);
        thj->schema1(schema);
        thj->schema2(schema);
        thj->oid1(sc->oid());
        thj->oid2(sc->oid());
        thj->joinId(0);
        thj->setJoinType(INNER);

        CalpontSystemCatalog::ColType dct;
        dct.colDataType = CalpontSystemCatalog::BIGINT;
        dct.colWidth = 8;
        dct.scale = 0;
        dct.precision = 0;
        dct.compressionType = ct.compressionType;

        TupleInfo ti(setTupleInfo(dct, sc->oid(), jobInfo, tbl_oid, sc, alias));
        jobInfo.keyInfo->token2DictTypeMap[ti.key] = ct;
        pds->tupleId(ti.key);  // pcs, pds use same tuple key, both 8-byte column
        pcs->tupleId(ti.key);
        thj->tupleId1(ti.key);
        thj->tupleId2(ti.key);

        if (jobInfo.tokenOnly.find(ti.key) == jobInfo.tokenOnly.end())
          jobInfo.tokenOnly[ti.key] = true;

        // Associate the steps
        AnyDataListSPtr spdl1(new AnyDataList());

        RowGroupDL* dl1 = new RowGroupDL(1, jobInfo.fifoSize);
        spdl1->rowGroupDL(dl1);
        dl1->OID(dictOid);

        JobStepAssociation outJs1;
        outJs1.outAdd(spdl1);
        pds->outputAssociation(outJs1);

        AnyDataListSPtr spdl2(new AnyDataList());

        RowGroupDL* dl2 = new RowGroupDL(1, jobInfo.fifoSize);
        spdl2->rowGroupDL(dl2);
        dl2->OID(sc->oid());

        JobStepAssociation outJs2;
        outJs2.outAdd(spdl2);
        pcs->outputAssociation(outJs2);

        JobStepAssociation outJs3;
        outJs3.outAdd(spdl1);
        outJs3.outAdd(spdl2);
        sjstep.reset(pds);
        jsv.push_back(sjstep);
        sjstep.reset(pcs);
        jsv.push_back(sjstep);

        thj->inputAssociation(outJs3);
        sjstep.reset(thj);
        jsv.push_back(sjstep);
      }
    }
    else if (!cc->isNull() && (cop & COMPARE_LIKE))  // both like and not like
    {
      return doExpressionFilter(sf, jobInfo);
    }
    else
    {
      // @bug 1151 string longer than colwidth of char/varchar.
      int64_t value = 0;
      int128_t value128 = 0;
      uint8_t rf = 0;
#ifdef FAILED_ATOI_IS_ZERO

      // if cvn throws (because there's non-digit data in the string, treat that as zero rather than
      //   throwing
      try
      {
        bool isNull = cc->isNull();
        convertValueNum(constval.safeString(""), ct, isNull, rf, jobInfo.timeZone, value);

        if (ct.colDataType == CalpontSystemCatalog::FLOAT && !isNull)
        {
          float f = cc->getFloatVal();
          value = *(reinterpret_cast<int32_t*>(&f));
        }
        else if (ct.colDataType == CalpontSystemCatalog::DOUBLE && !isNull)
        {
          double d = cc->getDoubleVal();
          value = *(reinterpret_cast<int64_t*>(&d));
        }
      }
      catch (...)
      {
        switch (ct.colDataType)
        {
          case CalpontSystemCatalog::TINYINT:
          case CalpontSystemCatalog::SMALLINT:
          case CalpontSystemCatalog::MEDINT:
          case CalpontSystemCatalog::INT:
          case CalpontSystemCatalog::BIGINT:
          case CalpontSystemCatalog::UTINYINT:
          case CalpontSystemCatalog::USMALLINT:
          case CalpontSystemCatalog::UMEDINT:
          case CalpontSystemCatalog::UINT:
          case CalpontSystemCatalog::UBIGINT:
            value = 0;
            rf = 0;
            break;

          default: throw; break;
        }
      }

#else
      bool isNull = cc->isNull();

      if (ct.isWideDecimalType())
        convertValueNum(constval.safeString(""), ct, isNull, rf, jobInfo.timeZone, value128);
      else
        convertValueNum(constval.safeString(""), ct, isNull, rf, jobInfo.timeZone, value);

      if (ct.colDataType == CalpontSystemCatalog::FLOAT && !isNull)
      {
        float f = cc->getFloatVal();
        value = *(reinterpret_cast<int32_t*>(&f));
      }
      else if (ct.colDataType == CalpontSystemCatalog::DOUBLE && !isNull)
      {
        double d = cc->getDoubleVal();
        value = *(reinterpret_cast<int64_t*>(&d));
      }

#endif

      // @bug 2584, make "= null" to COMPARE_NIL.
      if (cc->isNull() && (opeq == *sop || opne == *sop))
        cop = COMPARE_NIL;

      if (jobInfo.trace)
        cout << "doSimpleFilter Emit pCol for SimpleColumn op ConstantColumn = " << value << " ("
             << cc->constval().safeString() << ')' << endl;

      if (sf->indexFlag() == 0)
      {
        pColStep* pcs = NULL;

        if (pc == NULL)
          pcs = new pColStep(sc->oid(), tbl_oid, ct, jobInfo);
        else
          pcs = new PseudoColStep(sc->oid(), tbl_oid, pc->pseudoType(), ct, jobInfo);

        if (sc->isColumnStore())
        {
          if (ct.isWideDecimalType())
            pcs->addFilter(cop, value128, rf);
          else
            pcs->addFilter(cop, value, rf);
        }

        pcs->alias(alias);
        pcs->view(view);
        pcs->name(sc->columnName());
        pcs->schema(sc->schemaName());
        pcs->cardinality(sf->cardinality());

        sjstep.reset(pcs);
        jsv.push_back(sjstep);

        // save for expression transformation
        pcs->addFilter(sf);

        TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tbl_oid, sc, alias));
        pcs->tupleId(ti.key);

        if (dictOid > 0)  // cc->type() == ConstantColumn::NULLDATA
        {
          if (jobInfo.tokenOnly.find(ti.key) == jobInfo.tokenOnly.end())
            jobInfo.tokenOnly[ti.key] = true;
        }

        if (cc->isNull() && (opis == *sop || opisnull == *sop))
          jobInfo.tableHasIsNull.insert(getTableKey(jobInfo, tbl_oid, alias, sc->schemaName(), view));
      }
      else
      {
        throw runtime_error("Uses indexes?");
#if 0
                CalpontSystemCatalog::IndexName indexName;
                indexName.schema = sc->schemaName();
                indexName.table = sc->tableName();
                indexName.index = sc->indexName();
                CalpontSystemCatalog::IndexOID indexOID = jobInfo.csc->lookupIndexNbr(indexName);
                pIdxWalk* pidxw = new pIdxWalk(JobStepAssociation(), JobStepAssociation(), 0,
                                               jobInfo.csc, indexOID.objnum, sc->oid(), tbl_oid,
                                               jobInfo.sessionId, jobInfo.txnId, jobInfo.verId, 0,
                                               jobInfo.statementId);

                pidxw->addSearchStr(cop, value);
                pidxw->cardinality(sf->cardinality());

                sjstep.reset(pidxw);
                jsv.push_back(sjstep);
                pIdxList* pidxL = new pIdxList(JobStepAssociation(),
                                               JobStepAssociation(), 0, jobInfo.csc,
                                               indexOID.listOID, tbl_oid, jobInfo.sessionId, jobInfo.txnId,
                                               jobInfo.verId, 0, jobInfo.statementId);

                /*output of idxwalk */
                AnyDataListSPtr spdl1(new AnyDataList());
                ZonedDL* dl1 = new ZonedDL(1, jobInfo.rm);
                dl1->OID(indexOID.objnum);
                spdl1->zonedDL(dl1);
                JobStepAssociation outJs1;
                outJs1.outAdd(spdl1);
                pidxw->outputAssociation(outJs1);

                /*inputput of idxlist */
                pidxL->inputAssociation(outJs1);

                /*output of idxlist */
                AnyDataListSPtr spdl2(new AnyDataList());
                ZonedDL* dl2 = new ZonedDL(1, jobInfo.rm);
                dl2->OID(indexOID.listOID);
                spdl2->zonedDL(dl2);
                JobStepAssociation outJs2;
                outJs2.outAdd(spdl2);
                pidxL->outputAssociation(outJs2);

                sjstep.reset(pidxL);
                jsv.push_back(sjstep);
#endif
      }
    }
  }
  else if (lhsType == SIMPLECOLUMN && rhsType == SIMPLECOLUMN)
  {
    // This is a join (different table) or filter step (same table)
    SimpleColumn* sc1 = static_cast<SimpleColumn*>(lhs);
    SimpleColumn* sc2 = static_cast<SimpleColumn*>(rhs);

    // @bug 1496. handle non equal operator as expression in v-table mode
    if ((sc1->tableName() != sc2->tableName() || sc1->tableAlias() != sc2->tableAlias() ||
         sc1->viewName() != sc2->viewName()) &&
        (sop->data() != "="))
    {
      return doExpressionFilter(sf, jobInfo);
    }

    // @bug 1349. no-op rules:
    // 1. If two columns of a simple filter are of the two different tables, and the
    //    filter operator is not "=", no-op this simple filter,
    // 2. If a join filter has "ANTI" option, no op this filter before ANTI hashjoin
    //    is supported in ExeMgr.
    // @bug 1933. Throw exception instead of no-op for MySQL virtual table
    //            (no connector re-filter).
    if (sf->joinFlag() == SimpleFilter::ANTI)
      throw runtime_error("Anti join is not currently supported");

    // Do a simple column join
    JobStepVector join = doJoin(sc1, sc2, jobInfo, sop, sf);
    // set cardinality for the hashjoin step. hj result card <= larger input card
    uint32_t card = 0;

    if (sf->cardinality() > sc1->cardinality() && sf->cardinality() > sc2->cardinality())
      card = (sc1->cardinality() > sc2->cardinality() ? sc1->cardinality() : sc2->cardinality());
    else
      card = sf->cardinality();

    join[join.size() - 1].get()->cardinality(card);

    jsv.insert(jsv.end(), join.begin(), join.end());
  }
  else if (lhsType == CONSTANTCOLUMN && rhsType == SIMPLECOLUMN)
  {
    // swap the two and process as normal
    SOP opsop(sop->opposite());
    sf->op(opsop);
    sf->lhs(rhs);
    sf->rhs(lhs);
    jsv = doSimpleFilter(sf, jobInfo);

    if (jsv.empty())
      throw runtime_error("Unhandled SimpleFilter");
  }
  else if (lhsType == ARITHMETICCOLUMN || rhsType == ARITHMETICCOLUMN || lhsType == FUNCTIONCOLUMN ||
           rhsType == FUNCTIONCOLUMN)
  {
    const ReturnedColumn* rc = static_cast<const ReturnedColumn*>(lhs);

    if (rc && (!rc->joinInfo()) && rhsType == AGGREGATECOLUMN)
      throw IDBExcept(ERR_AGG_IN_WHERE);

    // @bug5848, CP elimination for idbPartition(col)
    JobStepVector sv;

    if (optimizeIdbPatitionSimpleFilter(sf, sv, jobInfo))
      jsv.swap(sv);
    else
      jsv = doExpressionFilter(sf, jobInfo);
  }
  else if (lhsType == SIMPLECOLUMN &&
           (rhsType == AGGREGATECOLUMN || rhsType == ARITHMETICCOLUMN || rhsType == FUNCTIONCOLUMN))
  {
    const SimpleColumn* sc = static_cast<const SimpleColumn*>(lhs);
    const ReturnedColumn* rc = static_cast<const ReturnedColumn*>(rhs);

    if (sc->joinInfo() != 0)
      return doSemiJoin(sc, rc, jobInfo);
    else if (rhsType == AGGREGATECOLUMN)
      throw IDBExcept(ERR_AGG_IN_WHERE);
    else
      throw logic_error("doSimpleFilter: Unhandled SimpleFilter.");
  }
  else if (rhsType == SIMPLECOLUMN &&
           (lhsType == AGGREGATECOLUMN || lhsType == ARITHMETICCOLUMN || lhsType == FUNCTIONCOLUMN))
  {
    const SimpleColumn* sc = static_cast<const SimpleColumn*>(rhs);
    const ReturnedColumn* rc = static_cast<const ReturnedColumn*>(lhs);

    if (sc->joinInfo() != 0)
      return doSemiJoin(sc, rc, jobInfo);
    else if (lhsType == AGGREGATECOLUMN)
      throw IDBExcept(ERR_AGG_IN_WHERE);
    else
      throw logic_error("doSimpleFilter: Unhandled SimpleFilter.");
  }
  else if (rhsType == WINDOWFUNCTIONCOLUMN)
  {
    // workaroud for IN subquery with window function
    // window function as arguments of a function is not covered !!
    jsv = doExpressionFilter(sf, jobInfo);
  }
  else if (lhsType == CONSTANTCOLUMN && rhsType == CONSTANTCOLUMN)
  {
    jsv = doExpressionFilter(sf, jobInfo);
  }
  else
  {
    cerr << boldStart << "doSimpleFilter: Unhandled SimpleFilter: left = " << lhsType
         << ", right = " << rhsType << boldStop << endl;

    throw logic_error("doSimpleFilter: Unhandled SimpleFilter.");
  }

  return jsv;
}

const JobStepVector doOuterJoinOnFilter(OuterJoinOnFilter* oj, JobInfo& jobInfo)
{
  JobStepVector jsv;

  if (oj == 0)
    return jsv;

  // cout << "doOuterJoinOnFilter " << endl;
  // cout << *oj << endl;

  // Parse the join on filter to join steps and an expression step, if any.
  stack<ParseTree*> nodeStack;        // stack used for pre-order traverse
  set<ParseTree*> doneNodes;          // solved joins and simple filters
  map<ParseTree*, ParseTree*> cpMap;  // <child, parent> link for node removal
  JobStepVector join;                 // join step with its projection steps
  bool keepFilters = false;           // keep filters for cross engine step

  // To compromise the front end difficulty on setting outer attributes.
  set<uint64_t> tablesInOuter;

  // root
  ParseTree* filters = new ParseTree(*(oj->pt().get()));
  nodeStack.push(filters);
  cpMap[filters] = NULL;

  // @bug5311, optimization for outer join on clause
  set<uint64_t> tablesInJoin;

  // @bug3683, function join
  vector<pair<ParseTree*, SJSTEP> > fjCandidates;

  // while stack is not empty
  while (!nodeStack.empty())
  {
    ParseTree* cn = nodeStack.top();  // current node
    nodeStack.pop();
    TreeNode* tn = cn->data();
    Operator* op = dynamic_cast<Operator*>(tn);  // AND | OR

    if (op != NULL && (*op == opOR || *op == opor))
      continue;

    // join is expressed as SimpleFilter
    SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);

    if (sf != NULL)
    {
      if (sf->joinFlag() == SimpleFilter::ANTI)
        throw runtime_error("Anti join is not currently supported");

      ReturnedColumn* lhs = sf->lhs();
      ReturnedColumn* rhs = sf->rhs();
      SOP sop = sf->op();

      SimpleColumn* sc1 = dynamic_cast<SimpleColumn*>(lhs);
      SimpleColumn* sc2 = dynamic_cast<SimpleColumn*>(rhs);

      if ((sc1 != NULL && sc2 != NULL) && (sop->data() == "=") &&
          (sc1->tableName() != sc2->tableName() || sc1->tableAlias() != sc2->tableAlias() ||
           sc1->viewName() != sc2->viewName()))
      {
        // @bug3037, workaround on join order, wish this can be corrected soon,
        // cascade outer table attribute.
        CalpontSystemCatalog::OID tableOid1 = tableOid(sc1, jobInfo.csc);
        uint64_t tid1 =
            getTableKey(jobInfo, tableOid1, sc1->tableAlias(), sc1->schemaName(), sc1->viewName());
        CalpontSystemCatalog::OID tableOid2 = tableOid(sc2, jobInfo.csc);
        uint64_t tid2 =
            getTableKey(jobInfo, tableOid2, sc2->tableAlias(), sc2->schemaName(), sc2->viewName());

        if (tablesInOuter.find(tid1) != tablesInOuter.end())
          sc1->returnAll(true);
        else if (tablesInOuter.find(tid2) != tablesInOuter.end())
          sc2->returnAll(true);

        if (sc1->returnAll() && !sc2->returnAll())
          tablesInOuter.insert(tid1);
        else if (!sc1->returnAll() && sc2->returnAll())
          tablesInOuter.insert(tid2);

        tablesInJoin.insert(tid1);
        tablesInJoin.insert(tid2);

        join = doJoin(sc1, sc2, jobInfo, sop, sf);
        // set cardinality for the hashjoin step.
        uint32_t card = sf->cardinality();

        if (sf->cardinality() > sc1->cardinality() && sf->cardinality() > sc2->cardinality())
          card = ((sc1->cardinality() > sc2->cardinality()) ? sc1->cardinality() : sc2->cardinality());

        join[join.size() - 1].get()->cardinality(card);

        jsv.insert(jsv.end(), join.begin(), join.end());
        doneNodes.insert(cn);
      }
      else
      {
        ExpressionStep* es = new ExpressionStep(jobInfo);

        if (es == NULL)
          throw runtime_error("Failed to create ExpressionStep 2");

        SJSTEP sjstep;
        es->expressionFilter(sf, jobInfo);
        sjstep.reset(es);

        if (es->functionJoinInfo())
          fjCandidates.push_back(make_pair(cn, sjstep));
      }
    }

    // Add right and left to the stack.
    ParseTree* right = cn->right();

    if (right != NULL)
    {
      cpMap[right] = cn;
      nodeStack.push(right);
    }

    ParseTree* left = cn->left();

    if (left != NULL)
    {
      cpMap[left] = cn;
      nodeStack.push(left);
    }
  }

  // check if there are any join steps in jsv.
  bool isOk = true;
  TupleHashJoinStep* thjs = NULL;

  for (JobStepVector::iterator i = jsv.begin(); i != jsv.end(); i++)
  {
    if (dynamic_cast<TupleHashJoinStep*>(i->get()) != thjs)
      thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
  }

  // no simple column join found, try function join
  if (thjs == NULL)
  {
    // check any expressions can be converted to function joins.
    vector<pair<ParseTree*, SJSTEP> >::iterator i = fjCandidates.begin();

    while (i != fjCandidates.end())
    {
      ExpressionStep* es = dynamic_cast<ExpressionStep*>((i->second).get());
      SJSTEP sjstep = expressionToFuncJoin(es, jobInfo);
      idbassert(sjstep.get());
      jsv.push_back(sjstep);

      doneNodes.insert(i->first);

      i++;
    }
  }

  // check again if we got a join step.
  for (JobStepVector::iterator i = jsv.begin(); i != jsv.end(); i++)
  {
    if (dynamic_cast<TupleHashJoinStep*>(i->get()) != thjs)
      thjs = dynamic_cast<TupleHashJoinStep*>(i->get());
  }

  if (thjs != NULL)
  {
    // @bug5311, optimization for outer join on clause, move out small side simple filters.
    // some repeat code, but better done after join sides settled.
    nodeStack.push(filters);
    cpMap[filters] = NULL;

    // while stack is not empty
    while (!nodeStack.empty())
    {
      ParseTree* cn = nodeStack.top();  // current node
      nodeStack.pop();
      TreeNode* tn = cn->data();
      Operator* op = dynamic_cast<Operator*>(tn);  // AND | OR

      if (op != NULL && (*op == opOR || *op == opor))
        continue;

      SimpleFilter* sf = dynamic_cast<SimpleFilter*>(tn);

      if ((sf != NULL) && (doneNodes.find(cn) == doneNodes.end()))
      {
        // joins are done, this is not a join.
        ReturnedColumn* lhs = sf->lhs();
        ReturnedColumn* rhs = sf->rhs();
        SOP sop = sf->op();

        // handle simple-simple | simple-constant | constant-simple
        SimpleColumn* sc = NULL;
        SimpleColumn* sc1 = dynamic_cast<SimpleColumn*>(lhs);
        SimpleColumn* sc2 = dynamic_cast<SimpleColumn*>(rhs);

        if ((sc1 != NULL && sc2 != NULL) &&
            (sc1->tableName() == sc2->tableName() && sc1->tableAlias() == sc2->tableAlias() &&
             sc1->viewName() == sc2->viewName()))
        {
          sc = sc1;  // same table, just check sc1
        }
        else if (sc1 != NULL && dynamic_cast<ConstantColumn*>(rhs) != NULL)
        {
          sc = sc1;
        }
        else if (sc2 != NULL && dynamic_cast<ConstantColumn*>(lhs) != NULL)
        {
          sc = sc2;
        }

        if (sc != NULL)
        {
          CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
          uint64_t tid = getTableKey(jobInfo, tblOid, sc->tableAlias(), sc->schemaName(), sc->viewName());

          // skip outer table filters or table not directly involved in the outer join
          if (tablesInOuter.find(tid) != tablesInOuter.end() || tablesInJoin.find(tid) == tablesInJoin.end())
            continue;

          JobStepVector sfv = doSimpleFilter(sf, jobInfo);
          ExpressionStep* es = NULL;

          for (JobStepVector::iterator k = sfv.begin(); k != sfv.end(); k++)
          {
            k->get()->onClauseFilter(true);

            if ((es = dynamic_cast<ExpressionStep*>(k->get())) != NULL)
              es->associatedJoinId(thjs->joinId());
          }

          jsv.insert(jsv.end(), sfv.begin(), sfv.end());

          // MCOL-1182 if we are doing a join between a cross engine
          // step and a constant then keep the filter for the cross
          // engine step instead of deleting it further down.
          if (!sc->isColumnStore())
          {
            keepFilters = true;
          }

          doneNodes.insert(cn);
        }
      }

      // Add right and left to the stack.
      ParseTree* right = cn->right();

      if (right != NULL)
      {
        cpMap[right] = cn;
        nodeStack.push(right);
      }

      ParseTree* left = cn->left();

      if (left != NULL)
      {
        cpMap[left] = cn;
        nodeStack.push(left);
      }
    }

    // remove joins from the original filters
    ParseTree* nullTree = NULL;

    for (set<ParseTree*>::iterator i = doneNodes.begin(); i != doneNodes.end() && isOk; i++)
    {
      ParseTree* c = *i;
      map<ParseTree*, ParseTree*>::iterator j = cpMap.find(c);

      if (j == cpMap.end())
      {
        isOk = false;
        continue;
      }

      ParseTree* p = j->second;

      if (p == NULL)
      {
        filters = NULL;

        if (!keepFilters)
        {
          delete c;
        }
      }
      else
      {
        map<ParseTree*, ParseTree*>::iterator k = cpMap.find(p);

        if (k == cpMap.end())
        {
          isOk = false;
          continue;
        }

        ParseTree* pp = k->second;

        if (pp == NULL)
        {
          if (p->left() == c)
            filters = p->right();
          else
            filters = p->left();

          cpMap[filters] = NULL;
        }
        else
        {
          if (p->left() == c)
          {
            if (pp->left() == p)
              pp->left(p->right());
            else
              pp->right(p->right());

            cpMap[p->right()] = pp;
          }
          else
          {
            if (pp->left() == p)
              pp->left(p->left());
            else
              pp->right(p->left());

            cpMap[p->left()] = pp;
          }
        }

        p->left(nullTree);
        p->right(nullTree);

        if (!keepFilters)
        {
          delete p;
          delete c;
        }
      }
    }

    // construct an expression step, if additional comparison exists.
    if (isOk && filters != NULL && filters->data() != NULL)
    {
      ExpressionStep* es = new ExpressionStep(jobInfo);

      if (es == NULL)
        throw runtime_error("Failed to create ExpressionStep 1");

      es->expressionFilter(filters, jobInfo);
      es->associatedJoinId(thjs->joinId());
      SJSTEP sjstep(es);
      jsv.push_back(sjstep);
    }
  }
  else
  {
    // Due to Calpont view handling, some joins may treated as expressions.
    ExpressionStep* es = new ExpressionStep(jobInfo);

    if (es == NULL)
      throw runtime_error("Failed to create ExpressionStep 1");

    es->expressionFilter(filters, jobInfo);
    SJSTEP sjstep(es);
    jsv.push_back(sjstep);
  }

  delete filters;

  if (!isOk)
  {
    throw runtime_error("Failed to parse join condition.");
  }

  if (jobInfo.trace)
  {
    ostringstream oss;
    oss << "\nOuterJoinOn steps: " << endl;
    ostream_iterator<JobStepVector::value_type> oIter(oss, "\n");
    copy(jsv.begin(), jsv.end(), oIter);
    cout << oss.str();
  }

  return jsv;
}

bool tryCombineDictionary(JobStepVector& jsv1, JobStepVector& jsv2, int8_t bop)
{
  JobStepVector::iterator it2 = jsv2.end() - 1;

  // already checked: (typeid(*(it2->get()) != typeid(pDictionaryStep))
  if (typeid(*((it2 - 1)->get())) != typeid(pColStep))
    return false;

  pDictionaryStep* ipdsp = dynamic_cast<pDictionaryStep*>(it2->get());
  bool onClauseFilter = ipdsp->onClauseFilter();

  JobStepVector::iterator iter = jsv1.begin();
  JobStepVector::iterator end = jsv1.end();

  if (bop == BOP_OR)
  {
    iter = end - 1;
  }

  while (iter != end)
  {
    pDictionaryStep* pdsp = dynamic_cast<pDictionaryStep*>(iter->get());

    if (pdsp != NULL && pdsp->onClauseFilter() == onClauseFilter)
    {
      // If the OID's match and the BOP's match and the previous step is pcolstep,
      // then append the filters.
      if ((ipdsp->tupleId() == pdsp->tupleId()) && (dynamic_cast<pColStep*>((iter - 1)->get()) != NULL))
      {
        if (pdsp->BOP() == BOP_NONE)
        {
          if (ipdsp->BOP() == BOP_NONE || ipdsp->BOP() == bop)
          {
            pdsp->appendFilter(ipdsp->filterString(), ipdsp->filterCount());
            pdsp->setBOP(bop);
            pdsp->appendFilter(ipdsp->getFilters());
            return true;
          }
        }
        else if (pdsp->BOP() == bop)
        {
          if (ipdsp->BOP() == BOP_NONE || ipdsp->BOP() == bop)
          {
            pdsp->appendFilter(ipdsp->filterString(), ipdsp->filterCount());
            pdsp->appendFilter(ipdsp->getFilters());
            return true;
          }
        }
      }
    }

    ++iter;
  }

  return false;
}

bool tryCombineDictionaryScan(JobStepVector& jsv1, JobStepVector& jsv2, int8_t bop)
{
// disable dictionary scan -- bug3321
#if 0
    JobStepVector::iterator it2 = jsv2.begin();

    if (typeid(*((it2 + 1)->get())) != typeid(pColStep))
        return false;

    if ((typeid(*((it2 + 2)->get())) != typeid(TupleHashJoinStep)) &&
            (typeid(*((it2 + 2)->get())) != typeid(HashJoinStep)))
        return false;

    pDictionaryScan* ipdsp = dynamic_cast<pDictionaryScan*>(it2->get());
    bool onClauseFilter = ipdsp->onClauseFilter();

    JobStepVector::iterator iter = jsv1.begin();
    JobStepVector::iterator end = jsv1.end();

    if (bop == BOP_OR)
    {
        if (jsv1.size() >= 3)
            iter = end - 3;
        else
            return false;
    }

    while (iter != end)
    {
        pDictionaryScan* pdsp = dynamic_cast<pDictionaryScan*>((*iter).get());

        if (pdsp != NULL && pdsp->onClauseFilter() == onClauseFilter)
        {
            // If the OID's match and the BOP's match and the previous step is pcolstep,
            // then append the filters.
            if ((ipdsp->tupleId() == pdsp->tupleId()) &&
                    (typeid(*((iter + 1)->get())) == typeid(pColStep)))
            {
                if (pdsp->BOP() == BOP_NONE)
                {
                    if (ipdsp->BOP() == BOP_NONE || ipdsp->BOP() == bop)
                    {
                        pdsp->appendFilter(ipdsp->filterString(), ipdsp->filterCount());
                        pdsp->setBOP(bop);
                        pdsp->appendFilter(ipdsp->getFilters());
                        return true;
                    }
                }
                else if (pdsp->BOP() == bop)
                {
                    if (ipdsp->BOP() == BOP_NONE || ipdsp->BOP() == bop)
                    {
                        pdsp->appendFilter(ipdsp->filterString(), ipdsp->filterCount());
                        pdsp->appendFilter(ipdsp->getFilters());
                        return true;
                    }
                }
            }
        }

        ++iter;
    }

#endif

  return false;
}

// We want to search the existing filters in the stack for this column to see if we can just add the
// filters for this step to a previous pColStep or pDictionary.
bool tryCombineFilters(JobStepVector& jsv1, JobStepVector& jsv2, int8_t bop)
{
  // A couple of tests to make sure we're operating on the right things...
  if (jsv1.size() < 1)
    return false;

  // if filter by pDictionary, there are two steps: pcolstep and pdictionarystep.
  if (jsv2.size() == 2 && typeid(*jsv2.back().get()) == typeid(pDictionaryStep))
    return tryCombineDictionary(jsv1, jsv2, bop);

  // if filter by pDictionaryScan, there are three steps: pdictionaryscan, pcolstep and join.
  if (jsv2.size() == 3 && typeid(*jsv2.front().get()) == typeid(pDictionaryScan))
    return tryCombineDictionaryScan(jsv1, jsv2, bop);

  // non-dictionary filters
  if (jsv2.size() != 1)
    return false;

  pColStep* ipcsp = dynamic_cast<pColStep*>(jsv2.back().get());

  if (ipcsp == NULL)
    return false;

  bool onClauseFilter = ipcsp->onClauseFilter();

  JobStepVector::iterator iter = jsv1.begin();
  JobStepVector::iterator end = jsv1.end();

  // only try last step in jsv1 if operator is OR.
  if (bop == BOP_OR)
  {
    iter = end - 1;
  }

  while (iter != end)
  {
    pColStep* pcsp = dynamic_cast<pColStep*>(iter->get());

    if (pcsp != NULL && pcsp->onClauseFilter() == onClauseFilter)
    {
      idbassert(pcsp);

      // If the OID's match and the BOP's match then append the filters
      if (ipcsp->tupleId() == pcsp->tupleId())
      {
        if (pcsp->BOP() == BOP_NONE)
        {
          if (ipcsp->BOP() == BOP_NONE || ipcsp->BOP() == bop)
          {
            pcsp->appendFilter(ipcsp->filterString(), ipcsp->filterCount());
            pcsp->setBOP(bop);
            pcsp->appendFilter(ipcsp->getFilters());
            return true;
          }
        }
        else if (pcsp->BOP() == bop)
        {
          if (ipcsp->BOP() == BOP_NONE || ipcsp->BOP() == bop)
          {
            pcsp->appendFilter(ipcsp->filterString(), ipcsp->filterCount());
            pcsp->appendFilter(ipcsp->getFilters());
            return true;
          }
        }
      }
    }

    ++iter;
  }

  return false;
}

const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo)
{
  JobStepVector jsv;

  if (cf == 0)
    return jsv;

  SOP op = cf->op();
  // default op is 'and'
  string opStr("and");

  if (op)
    opStr = ba::to_lower_copy(op->data());

  SJSTEP sjstep;

  if (opStr == "and" || opStr == "or")
  {
    SOP sop;

    const SSC sc = boost::dynamic_pointer_cast<SimpleColumn>(cf->col());

    // if column from subquery
    if (!sc || sc->schemaName().empty())
    {
      return doExpressionFilter(cf, jobInfo);
    }

    ConstantFilter::FilterList fl = cf->filterList();
    CalpontSystemCatalog::OID dictOid = 0;
    CalpontSystemCatalog::ColType ct = sc.get()->colType();
    PseudoColumn* pc = dynamic_cast<PseudoColumn*>(sc.get());

    // XXX use this before connector sets colType in sc correctly.
    //    type of pseudo column is set by connector
    if (!sc->schemaName().empty() && sc->isColumnStore() && !pc)
    {
      ct = jobInfo.csc->colType(sc->oid());
      ct.charsetNumber = sc->colType().charsetNumber;
    }

    // X
    CalpontSystemCatalog::OID tbOID = tableOid(sc.get(), jobInfo.csc);
    string alias(extractTableAlias(sc));
    string view(sc->viewName());
    string schema(sc->schemaName());

    if ((dictOid = isDictCol(ct)) > 0)
    {
      if (jobInfo.trace)
        cout << "Emit pTokenByScan/pCol for SimpleColumn op ConstantColumn "
                "[op ConstantColumn]"
             << endl;

      pColStep* pcs = new pColStep(sc->oid(), tbOID, ct, jobInfo);
      pcs->alias(alias);
      pcs->view(view);
      pcs->name(sc->columnName());
      pcs->schema(sc->schemaName());
      pcs->cardinality(sc->cardinality());

      if (ct.colDataType == execplan::CalpontSystemCatalog::TEXT ||
          ct.colDataType == execplan::CalpontSystemCatalog::BLOB ||
          filterWithDictionary(dictOid, jobInfo.stringScanThreshold))
      {
        pDictionaryStep* pds = new pDictionaryStep(dictOid, tbOID, ct, jobInfo);
        jobInfo.keyInfo->dictOidToColOid[dictOid] = sc->oid();
        pds->alias(alias);
        pds->view(view);
        pds->name(sc->columnName());
        pds->schema(sc->schemaName());
        pds->cardinality(sc->cardinality());

        if (op)
          pds->setBOP(bop2num(op));

        // Add the filter(s)
        for (unsigned i = 0; i < fl.size(); i++)
        {
          const SSFP sf = fl[i];
          const ConstantColumn* cc;
          cc = static_cast<const ConstantColumn*>(sf->rhs());
          sop = sf->op();

          // add each filter to pColStep
          int8_t cop = op2num(sop);

          // @bug 2584, make "= null" to COMPARE_NIL.
          if (cc->isNull() && (opeq == *sop || opne == *sop))
            cop = COMPARE_NIL;

          string value = cc->constval().safeString("");
          // Because, on a filter, we want to compare ignoring trailing spaces
          boost::algorithm::trim_right_if(value, boost::is_any_of(" "));

          pds->addFilter(cop, value);
        }

        // data list for pcolstep output
        AnyDataListSPtr spdl1(new AnyDataList());

        JobStepAssociation outJs1;
        outJs1.outAdd(spdl1);
        pcs->outputAssociation(outJs1);

        // data list for pdictionarystep output
        AnyDataListSPtr spdl2(new AnyDataList());

        JobStepAssociation outJs2;
        outJs2.outAdd(spdl2);
        pds->outputAssociation(outJs2);

        // Associate pcs with pds
        JobStepAssociation outJs;
        outJs.outAdd(spdl1);
        pds->inputAssociation(outJs);

        sjstep.reset(pcs);
        jsv.push_back(sjstep);
        sjstep.reset(pds);
        jsv.push_back(sjstep);

        // save for expression transformation
        pds->addFilter(cf);

        // token column
        CalpontSystemCatalog::ColType tct;
        tct.colDataType = CalpontSystemCatalog::BIGINT;
        tct.colWidth = 8;
        tct.scale = 0;
        tct.precision = 0;
        tct.compressionType = ct.compressionType;
        TupleInfo ti(setTupleInfo(tct, sc->oid(), jobInfo, tbOID, sc.get(), alias));
        jobInfo.keyInfo->token2DictTypeMap[ti.key] = ct;
        pcs->tupleId(ti.key);

        // string column
        ti = setTupleInfo(ct, dictOid, jobInfo, tbOID, sc.get(), alias);
        pds->tupleId(ti.key);
        jobInfo.keyInfo->dictKeyMap[pcs->tupleId()] = ti.key;
        jobInfo.tokenOnly[ti.key] = false;
      }
      else
      {
        pDictionaryScan* pds = new pDictionaryScan(dictOid, tbOID, ct, jobInfo);
        jobInfo.keyInfo->dictOidToColOid[dictOid] = sc.get()->oid();
        pds->alias(alias);
        pds->view(view);
        pds->name(sc.get()->columnName());
        pds->schema(sc.get()->schemaName());

        if (op)
          pds->setBOP(bop2num(op));

        // Add the filter(s)
        for (unsigned i = 0; i < fl.size(); i++)
        {
          const SSFP sf = fl[i];
          const ConstantColumn* cc;
          cc = static_cast<const ConstantColumn*>(sf->rhs());
          sop = sf->op();

          // add each filter to pColStep
          int8_t cop = op2num(sop);

          // @bug 2584, make "= null" to COMPARE_NIL.
          if (cc->isNull() && (opeq == *sop || opne == *sop))
            cop = COMPARE_NIL;

          string value = cc->constval().safeString("");
          // Because, on a filter, we want to compare ignoring trailing spaces
          boost::algorithm::trim_right_if(value, boost::is_any_of(" "));

          pds->addFilter(cop, value);
        }

        // save for expression transformation
        pds->addFilter(cf);

        TupleHashJoinStep* thj = new TupleHashJoinStep(jobInfo);
        thj->tableOid1(0);
        thj->tableOid2(tbOID);
        thj->alias1(alias);
        thj->alias2(alias);
        thj->view1(view);
        thj->view2(view);
        thj->schema1(schema);
        thj->schema2(schema);
        thj->oid1(sc->oid());
        thj->oid2(sc->oid());
        thj->joinId(0);
        thj->setJoinType(INNER);

        // Associate the steps
        AnyDataListSPtr spdl1(new AnyDataList());
        RowGroupDL* dl1 = new RowGroupDL(1, jobInfo.fifoSize);
        spdl1->rowGroupDL(dl1);
        dl1->OID(dictOid);

        JobStepAssociation outJs1;
        outJs1.outAdd(spdl1);
        pds->outputAssociation(outJs1);

        AnyDataListSPtr spdl2(new AnyDataList());
        RowGroupDL* dl2 = new RowGroupDL(1, jobInfo.fifoSize);
        spdl2->rowGroupDL(dl2);
        dl2->OID(sc->oid());

        JobStepAssociation outJs2;
        outJs2.outAdd(spdl2);
        pcs->outputAssociation(outJs2);

        JobStepAssociation outJs3;
        outJs3.outAdd(spdl1);
        outJs3.outAdd(spdl2);
        thj->inputAssociation(outJs3);

        sjstep.reset(pds);
        jsv.push_back(sjstep);
        sjstep.reset(pcs);
        jsv.push_back(sjstep);
        sjstep.reset(thj);
        jsv.push_back(sjstep);

        CalpontSystemCatalog::ColType dct;
        dct.colDataType = CalpontSystemCatalog::BIGINT;
        dct.colWidth = 8;
        dct.scale = 0;
        dct.precision = 0;
        dct.compressionType = ct.compressionType;

        TupleInfo ti(setTupleInfo(dct, sc->oid(), jobInfo, tbOID, sc.get(), alias));
        jobInfo.keyInfo->token2DictTypeMap[ti.key] = ct;
        pds->tupleId(ti.key);  // pcs, pds use same tuple key, both 8-byte column
        pcs->tupleId(ti.key);
        thj->tupleId1(ti.key);
        thj->tupleId2(ti.key);

        if (jobInfo.tokenOnly.find(ti.key) == jobInfo.tokenOnly.end())
          jobInfo.tokenOnly[ti.key] = true;
      }
    }
    else
    {
      if (jobInfo.trace)
        cout << "Emit pCol for SimpleColumn op ConstantColumn [op ConstantColumn]" << endl;

      CalpontSystemCatalog::OID tblOid = tableOid(sc.get(), jobInfo.csc);
      string alias(extractTableAlias(sc));
      pColStep* pcs = NULL;

      if (pc == NULL)
        pcs = new pColStep(sc->oid(), tblOid, ct, jobInfo);
      else
        pcs = new PseudoColStep(sc->oid(), tblOid, pc->pseudoType(), ct, jobInfo);

      pcs->alias(extractTableAlias(sc));
      pcs->view(sc->viewName());
      pcs->name(sc->columnName());
      pcs->schema(sc->schemaName());

      if (sc->isColumnStore())
      {
        if (op)
          pcs->setBOP(bop2num(op));

        for (unsigned i = 0; i < fl.size(); i++)
        {
          const SSFP sf = fl[i];
          const ConstantColumn* cc;
          cc = static_cast<const ConstantColumn*>(sf->rhs());
          sop = sf->op();

          // add each filter to pColStep
          int8_t cop = op2num(sop);
          int64_t value = 0;
          int128_t value128 = 0;
          string constval = cc->constval().safeString("");

          // @bug 1151 string longer than colwidth of char/varchar.
          uint8_t rf = 0;
          bool isNull = cc->isNull();

          if (ct.isWideDecimalType())
            convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value128);
          else
            convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value);

          if (ct.colDataType == CalpontSystemCatalog::FLOAT && !isNull)
          {
            float f = cc->getFloatVal();
            value = *(reinterpret_cast<int32_t*>(&f));
          }
          else if (ct.colDataType == CalpontSystemCatalog::DOUBLE && !isNull)
          {
            double d = cc->getDoubleVal();
            value = *(reinterpret_cast<int64_t*>(&d));
          }

          // @bug 2584, make "= null" to COMPARE_NIL.
          if (cc->isNull() && (opeq == *sop || opne == *sop))
            cop = COMPARE_NIL;

          if (ct.isWideDecimalType())
            pcs->addFilter(cop, value128, rf);
          else
            pcs->addFilter(cop, value, rf);
        }
      }

      // save for expression transformation
      pcs->addFilter(cf);

      sjstep.reset(pcs);
      jsv.push_back(sjstep);

      // XXX use this before connector sets colType in sc correctly.
      CalpontSystemCatalog::ColType ct = sc->colType();

      if (!sc->schemaName().empty() && sc->isColumnStore() && !pc)
      {
        ct = jobInfo.csc->colType(sc->oid());
        ct.charsetNumber = sc->colType().charsetNumber;
      }
      TupleInfo ti(setTupleInfo(ct, sc->oid(), jobInfo, tblOid, sc.get(), alias));
      // X			TupleInfo ti(setTupleInfo(sc->colType(), sc->oid(), jobInfo, tblOid, sc.get(),
      // alias));
      pcs->tupleId(ti.key);
    }
  }
  else
  {
    cerr << boldStart << "doConstantFilter: Can only handle 'and' and 'or' right now, got '" << opStr << "'"
         << boldStop << endl;
    throw logic_error("doConstantFilter: Not handled operation type.");
  }

  return jsv;
}

const JobStepVector doFunctionFilter(const ParseTree* n, JobInfo& jobInfo)
{
  FunctionColumn* fc = dynamic_cast<FunctionColumn*>(n->data());
  idbassert(fc);

  JobStepVector jsv;
  string functionName = ba::to_lower_copy(fc->functionName());

  if (functionName.compare("in") == 0 || functionName.compare(" in ") == 0)
  {
    const funcexp::FunctionParm& parms = fc->functionParms();
    FunctionColumn* parm0Fc = dynamic_cast<FunctionColumn*>(parms[0]->data());
    PseudoColumn* parm0Pc = dynamic_cast<PseudoColumn*>(parms[0]->data());
    vector<vector<string> > constParms(3);
    uint64_t constParmsCount = 0;

    if (parm0Fc && parm0Fc->functionName().compare("idbpartition") == 0)
    {
      for (uint64_t i = 1; i < parms.size(); i++)
      {
        ConstantColumn* cc = dynamic_cast<ConstantColumn*>(parms[i]->data());

        if (cc)
        {
          vector<string> cv;
          auto str = cc->constval().safeString("");
          boost::split(cv, str, boost::is_any_of("."));

          if (cv.size() == 3)
          {
            constParms[1].push_back(cv[0]);
            constParms[2].push_back(cv[1]);
            constParms[0].push_back(cv[2]);
            constParmsCount++;
          }
        }
      }

      if (constParmsCount == (parms.size() - 1))
      {
        const funcexp::FunctionParm& pcs = parm0Fc->functionParms();

        for (uint64_t i = 0; i < 3; i++)
        {
          ConstantFilter* cf = new ConstantFilter();
          SOP sop(new LogicOperator("or"));
          PseudoColumn* pc = dynamic_cast<PseudoColumn*>(pcs[i]->data());
          idbassert(pc);
          SSC col(pc->clone());
          cf->op(sop);
          cf->col(col);
          sop.reset(new PredicateOperator("="));

          for (uint64_t j = 0; j < constParmsCount; j++)
          {
            SimpleFilter* f = new SimpleFilter(sop, col->clone(), new ConstantColumn(constParms[i][j]));
            cf->pushFilter(f);
          }

          JobStepVector sv = doConstantFilter(cf, jobInfo);
          delete cf;

          jsv.insert(jsv.end(), sv.begin(), sv.end());
        }
      }

      // put the separate filtered resulted together
      JobStepVector sv = doExpressionFilter(n, jobInfo);
      jsv.insert(jsv.end(), sv.begin(), sv.end());
    }
    else if (parm0Pc != NULL)
    {
      for (uint64_t i = 1; i < parms.size(); i++)
      {
        ConstantColumn* cc = dynamic_cast<ConstantColumn*>(parms[i]->data());

        if (cc)
        {
          constParms[0].push_back(cc->constval().safeString(""));
          constParmsCount++;
        }
      }

      if (constParmsCount == (parms.size() - 1))
      {
        ConstantFilter* cf = new ConstantFilter();
        SOP sop(new LogicOperator("or"));
        SSC col(parm0Pc->clone());
        cf->op(sop);
        cf->col(col);
        sop.reset(new PredicateOperator("="));

        for (uint64_t j = 0; j < constParmsCount; j++)
        {
          SimpleFilter* f = new SimpleFilter(sop, col->clone(), new ConstantColumn(constParms[0][j]));
          cf->pushFilter(f);
        }

        JobStepVector sv = doConstantFilter(cf, jobInfo);
        delete cf;

        jsv.insert(jsv.end(), sv.begin(), sv.end());
      }
    }
  }

  if (jsv.empty())
    jsv = doExpressionFilter(n, jobInfo);

  return jsv;
}

#if 0
void doAND(JobStepVector& jsv, JobInfo& jobInfo)
{
//	idbassert(jobInfo.stack.size() >= 2);
    if (jobInfo.stack.size() < 2)
        return;

    JobStepVector rhv = jobInfo.stack.top();
    jobInfo.stack.pop();
    jsv = jobInfo.stack.top();
    jobInfo.stack.pop();

    // if the jobstep vector on any side of the operator is empty,
    // just push them to the stack without further process
    // # empty vector could be the result of an unhandled filter #
    if (jsv.size() == 0 || rhv.size() == 0)
    {
        jsv.insert(jsv.end(), rhv.begin(), rhv.end());
        jobInfo.stack.push(jsv);
        return;
    }

    //We need to do one optimization here. We can get multiple filters on the same column.
    // We should combine these here into one pColStep.
    // A query like "select LOG_KEY from F_TRANS where RECORD_TIMESTAMP between
    // '2007-01-01 00:00:00' and '2007-01-31 00:00:00';"  will cause this on the
    // RECORD_TIMESTAMP col
    // @bug 618 The filters are not always next to each other, so scan the whole jsv.

    if (tryCombineFilters(jsv, rhv, BOP_AND))
    {
        jobInfo.stack.push(jsv);
        return;
    }

    // concat rhv into jsv
    jsv.insert(jsv.end(), rhv.begin(), rhv.end());
    jobInfo.stack.push(jsv);
}


void doOR(const ParseTree* n, JobStepVector& jsv, JobInfo& jobInfo, bool tryCombine)
{
    idbassert(jobInfo.stack.size() >= 2);
    JobStepVector rhv = jobInfo.stack.top();
    jobInfo.stack.pop();
    jsv = jobInfo.stack.top();
    jobInfo.stack.pop();

    // @bug3570, attempt to combine only if there is one column involved.
    if (tryCombine && ((jsv.size() == 1 && dynamic_cast<pColStep*>(jsv.begin()->get()) != NULL) ||
                       (jsv.size() == 2 && dynamic_cast<pDictionaryStep*>((jsv.end() - 1)->get()) != NULL) ||
                       (jsv.size() == 3 && dynamic_cast<pDictionaryScan*>(jsv.begin()->get()) != NULL)) &&
            tryCombineFilters(jsv, rhv, BOP_OR))
    {
        jobInfo.stack.push(jsv);
        return;
    }

    // OR is processed as an expression
    jsv = doExpressionFilter(n, jobInfo);
    jobInfo.stack.push(jsv);
}
#endif

void doOR(ParseTree* n, JobInfo& jobInfo, bool tryCombine)
{
  JobStepVector jsv;

  // convert simple scalar filter sub to parse tree to be evaluated by expression
  {
    // travering in post-order because a node may be expanded
    ParseTree* node = n;
    stack<ParseTree*> nodeStack;
    ParseTree* lastVisit = NULL;

    while ((node || !nodeStack.empty()))
    {
      if (node)
      {
        nodeStack.push(node);
        node = node->left();
      }
      else
      {
        ParseTree* top = nodeStack.top();
        ParseTree* right = top->right();

        if (right && lastVisit != right)
        {
          node = right;
        }
        else
        {
          nodeStack.pop();

          TreeNode* tn = top->data();

          if (TreeNode2Type(tn) == SIMPLESCALARFILTER)
          {
            SimpleScalarFilter* sf = dynamic_cast<SimpleScalarFilter*>(tn);
            ParseTree* parseTree = NULL;

            if (simpleScalarFilterToParseTree(sf, parseTree, jobInfo))
            {
              ParseTree* ccp = top;
              delete ccp->data();
              ccp->left(parseTree->left());
              ccp->right(parseTree->right());
              ccp->data(parseTree->data());
              jobInfo.dynamicParseTreeVec.push_back(parseTree);
            }
            else if (parseTree)
            {
              delete parseTree;
              parseTree = NULL;
            }
          }

          lastVisit = top;
        }
      }
    }
  }

  if (tryCombine)
  {
    // travering OR branch n in post-order iteratively
    ParseTree* node = n;
    stack<ParseTree*> nodeStack;
    ParseTree* lastVisit = NULL;
    bool okToCombine = true;

    while ((node || !nodeStack.empty()) && okToCombine)
    {
      if (node)
      {
        nodeStack.push(node);
        node = node->left();
      }
      else
      {
        ParseTree* top = nodeStack.top();
        ParseTree* right = top->right();

        if (right && lastVisit != right)
        {
          node = right;
        }
        else
        {
          nodeStack.pop();
          lastVisit = top;

          TreeNode* tn = top->data();
          JobStepVector nsv;

          switch (TreeNode2Type(tn))
          {
            case SIMPLEFILTER:
            {
              nsv = doSimpleFilter(dynamic_cast<SimpleFilter*>(tn), jobInfo);
              break;
            }

            case OPERATOR:
            {
              const Operator* op = static_cast<const Operator*>(tn);

              if (*op == opAND || *op == opand || *op == opXOR || *op == opxor)
                okToCombine = false;
              else if (*op != opOR && *op != opor)
                throw logic_error("doOR: unknow operator type.");

              break;
            }

            case CONSTANTFILTER:
            {
              nsv = doConstantFilter(dynamic_cast<const ConstantFilter*>(tn), jobInfo);
              break;
            }

            case SIMPLESCALARFILTER:
            {
              // not possible, should be converted in the block above
              break;
            }

            case OUTERJOINONFILTER:
            case FUNCTIONCOLUMN:
            case ARITHMETICCOLUMN:
            case SIMPLECOLUMN:
            case CONSTANTCOLUMN:
            case EXISTSFILTER:
            case SELECTFILTER:
            {
              okToCombine = false;
              break;
            }

            case UNKNOWN:
            {
              cerr << boldStart << "doOR: Unknown" << boldStop << endl;
              throw logic_error("doOR: unknow type.");
              break;
            }

            default:
            {
              cerr << boldStart << "doOR: Not handled: " << TreeNode2Type(tn) << boldStop << endl;
              throw logic_error("doOR: Not handled treeNode type.");
              break;
            }
          }

          if (nsv.size() > 0 && okToCombine)
          {
            if (jsv.empty())
              jsv = nsv;
            else
              okToCombine = tryCombineFilters(jsv, nsv, BOP_OR);
          }
        }
      }
    }

    if (!okToCombine)
      jsv.clear();
  }

  if (jsv.empty())
  {
    // OR is processed as an expression
    jsv = doExpressionFilter(n, jobInfo);
  }

  JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
}

}  // end of unnamed namespace

namespace joblist
{
// This method is the entry point into the execution plan to joblist
// conversion performed by the functions in this file.
// @bug6131, pre-order traversing
/* static */ void JLF_ExecPlanToJobList::walkTree(execplan::ParseTree* n, JobInfo& jobInfo)
{
  TreeNode* tn = n->data();
  JobStepVector jsv;
  TreeNodeType tnType = TreeNode2Type(tn);

  switch (tnType)
  {
    case SIMPLEFILTER:
      jsv = doSimpleFilter(dynamic_cast<SimpleFilter*>(tn), jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, true);
      break;

    case OUTERJOINONFILTER:
      jsv = doOuterJoinOnFilter(dynamic_cast<OuterJoinOnFilter*>(tn), jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, true);
      break;

    case OPERATOR: break;

    case CONSTANTFILTER:
      jsv = doConstantFilter(dynamic_cast<const ConstantFilter*>(tn), jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
      break;

    case FUNCTIONCOLUMN:
      jsv = doFunctionFilter(n, jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
      break;

    case ARITHMETICCOLUMN:
      jsv = doExpressionFilter(n, jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
      break;

    case SIMPLECOLUMN:
      jsv = doExpressionFilter(n, jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
      break;

    case CONSTANTCOLUMN:
      jsv = doConstantBooleanFilter(n, jobInfo);
      JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
      break;

    case SIMPLESCALARFILTER: doSimpleScalarFilter(n, jobInfo); break;

    case EXISTSFILTER: doExistsFilter(n, jobInfo); break;

    case SELECTFILTER: doSelectFilter(n, jobInfo); break;

    case UNKNOWN:
      cerr << boldStart << "walkTree: Unknown" << boldStop << endl;
      throw logic_error("walkTree: unknow type.");
      break;

    default:
      /*
                  TREENODE,
                  FILTER,
                  RETURNEDCOLUMN,
                  AGGREGATECOLUMN,
                  TREENODEIMPL,
      */
      cerr << boldStart << "walkTree: Not handled: " << TreeNode2Type(tn) << boldStop << endl;
      throw logic_error("walkTree: Not handled treeNode type.");
      break;
  }

  // cout << *tn << endl;

  if (tnType == OPERATOR)
  {
    const Operator* op = static_cast<const Operator*>(tn);

    if (*op == opAND || *op == opand)
    {
      /*doAND(jsv, jobInfo)*/;

      if (n->left())
        walkTree(n->left(), jobInfo);

      if (n->right())
        walkTree(n->right(), jobInfo);
    }
    else if (*op == opOR || *op == opor)
    {
      doOR(n, jobInfo, true);
    }
    else if (*op == opXOR || *op == opxor)
    {
      doOR(n, jobInfo, false);
    }
    else
    {
      cerr << boldStart << "walkTree: only know how to handle 'and' and 'or' right now, got: " << *op
           << boldStop << endl;
      throw logic_error("walkTree: unknow operator type.");
    }
  }
}

void JLF_ExecPlanToJobList::addJobSteps(JobStepVector& nsv, JobInfo& jobInfo, bool tryCombine)
{
  idbassert(jobInfo.stack.size() < 2);

  if (jobInfo.stack.size() > 0)
  {
    JobStepVector& jsv = jobInfo.stack.top();

    if (tryCombine == false || tryCombineFilters(jsv, nsv, BOP_AND) == false)
      jsv.insert(jsv.end(), nsv.begin(), nsv.end());
  }
  else
  {
    jobInfo.stack.push(nsv);
  }
}

}  // namespace joblist

#ifdef __clang__
#pragma clang diagnostic pop
#endif
